This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 01a1e5fba [FLINK-36656][mysql] Fix type conversion failure for
newly-added sharding table with mysql boolean type (#3683)
01a1e5fba is described below
commit 01a1e5fba55038529a1eaf7758f85d3972d8e0ea
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Nov 19 23:36:10 2024 -0800
[FLINK-36656][mysql] Fix type conversion failure for newly-added sharding
table with mysql boolean type (#3683)
---
.../MySqlDeserializationConverterFactory.java | 19 ++
.../mysql/table/MySqlConnectorITCase.java | 72 -----
.../table/MySqlConnectorShardingTableITCase.java | 359 +++++++++++++++++++++
3 files changed, 378 insertions(+), 72 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
index ff5f69a06..50eccc176 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java
@@ -53,6 +53,8 @@ public class MySqlDeserializationConverterFactory {
public Optional<DeserializationRuntimeConverter>
createUserDefinedConverter(
LogicalType logicalType, ZoneId serverTimeZone) {
switch (logicalType.getTypeRoot()) {
+ case TINYINT:
+ return createTinyIntConverter();
case CHAR:
case VARCHAR:
return createStringConverter();
@@ -148,6 +150,23 @@ public class MySqlDeserializationConverterFactory {
}
}
+ private static Optional<DeserializationRuntimeConverter>
createTinyIntConverter() {
+
+ return Optional.of(
+ new DeserializationRuntimeConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ if (dbzObj instanceof Boolean) {
+ return dbzObj == Boolean.TRUE ? (byte) 1 : (byte)
0;
+ } else {
+ return Byte.parseByte(dbzObj.toString());
+ }
+ }
+ });
+ }
+
private static boolean hasFamily(LogicalType logicalType,
LogicalTypeFamily family) {
return logicalType.getTypeRoot().getFamilies().contains(family);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index 1826448b2..799e96ffa 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -1592,78 +1592,6 @@ public class MySqlConnectorITCase extends
MySqlSourceTestBase {
jobClient.cancel().get();
}
- @Test
- public void testShardingTablesWithInconsistentSchema() throws Exception {
- userDatabase1.createAndInitialize();
- userDatabase2.createAndInitialize();
- String sourceDDL =
- String.format(
- "CREATE TABLE `user` ("
- + " `id` DECIMAL(20, 0) NOT NULL,"
- + " name STRING,"
- + " address STRING,"
- + " phone_number STRING,"
- + " email STRING,"
- + " age INT,"
- + " primary key (`id`) not enforced"
- + ") WITH ("
- + " 'connector' = 'mysql-cdc',"
- + " 'hostname' = '%s',"
- + " 'port' = '%s',"
- + " 'username' = '%s',"
- + " 'password' = '%s',"
- + " 'database-name' = '%s',"
- + " 'table-name' = '%s',"
- + " 'scan.incremental.snapshot.enabled' =
'%s',"
- + " 'server-time-zone' = 'UTC',"
- + " 'server-id' = '%s',"
- + " 'scan.incremental.snapshot.chunk.size' =
'%s'"
- + ")",
- MYSQL_CONTAINER.getHost(),
- MYSQL_CONTAINER.getDatabasePort(),
- userDatabase1.getUsername(),
- userDatabase1.getPassword(),
- String.format(
- "(%s|%s)",
- userDatabase1.getDatabaseName(),
userDatabase2.getDatabaseName()),
- "user_table_.*",
- incrementalSnapshot,
- getServerId(),
- getSplitSize());
- tEnv.executeSql(sourceDDL);
-
- // async submit job
- TableResult result = tEnv.executeSql("SELECT * FROM `user`");
-
- CloseableIterator<Row> iterator = result.collect();
- waitForSnapshotStarted(iterator);
-
- try (Connection connection = userDatabase1.getJdbcConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("UPDATE user_table_1_1 SET email =
'[email protected]' WHERE id=111;");
- }
-
- try (Connection connection = userDatabase2.getJdbcConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("UPDATE user_table_2_2 SET age = 20 WHERE
id=221;");
- }
-
- String[] expected =
- new String[] {
- "+I[111, user_111, Shanghai, 123567891234,
[email protected], null]",
- "-U[111, user_111, Shanghai, 123567891234,
[email protected], null]",
- "+U[111, user_111, Shanghai, 123567891234,
[email protected], null]",
- "+I[121, user_121, Shanghai, 123567891234, null, null]",
- "+I[211, user_211, Shanghai, 123567891234, null, null]",
- "+I[221, user_221, Shanghai, 123567891234, null, 18]",
- "-U[221, user_221, Shanghai, 123567891234, null, 18]",
- "+U[221, user_221, Shanghai, 123567891234, null, 20]",
- };
-
- assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator,
expected.length));
- result.getJobClient().get().cancel().get();
- }
-
@Test
public void testStartupFromSpecificBinlogFilePos() throws Exception {
inventoryDatabase.createAndInitialize();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java
new file mode 100644
index 000000000..9297465dc
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.table;
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Stream;
+
+/** Integration tests for MySQL shardding tables. */
+@RunWith(Parameterized.class)
+public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MySqlConnectorShardingTableITCase.class);
+
+ private static final String TEST_USER = "mysqluser";
+ private static final String TEST_PASSWORD = "mysqlpw";
+
+ private static final MySqlContainer MYSQL8_CONTAINER =
+ createMySqlContainer(MySqlVersion.V8_0,
"docker/server-gtids/expire-seconds/my.cnf");
+
+ private final UniqueDatabase fullTypesMySql57Database =
+ new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", TEST_USER,
TEST_PASSWORD);
+
+ private final UniqueDatabase userDatabase1 =
+ new UniqueDatabase(MYSQL_CONTAINER, "user_1", TEST_USER,
TEST_PASSWORD);
+ private final UniqueDatabase userDatabase2 =
+ new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER,
TEST_PASSWORD);
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ private final StreamTableEnvironment tEnv =
+ StreamTableEnvironment.create(
+ env,
EnvironmentSettings.newInstance().inStreamingMode().build());
+
+ // enable the incrementalSnapshot (i.e: The new source MySqlParallelSource)
+ private final boolean incrementalSnapshot;
+
+ public MySqlConnectorShardingTableITCase(boolean incrementalSnapshot) {
+ this.incrementalSnapshot = incrementalSnapshot;
+ }
+
+ @Parameterized.Parameters(name = "incrementalSnapshot: {0}")
+ public static Object[] parameters() {
+ return new Object[][] {new Object[] {false}, new Object[] {true}};
+ }
+
+ @BeforeClass
+ public static void beforeClass() {
+ LOG.info("Starting MySql8 containers...");
+ Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+ LOG.info("Container MySql8 is started.");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ LOG.info("Stopping MySql8 containers...");
+ MYSQL8_CONTAINER.stop();
+ LOG.info("Container MySql8 is stopped.");
+ }
+
+ @Before
+ public void before() {
+ TestValuesTableFactory.clearAllData();
+ if (incrementalSnapshot) {
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(200);
+ } else {
+ env.setParallelism(1);
+ }
+ }
+
+ @Test
+ public void testShardingTablesWithTinyInt1() throws Exception {
+ fullTypesMySql57Database.createAndInitialize();
+ try (Connection connection =
fullTypesMySql57Database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(String.format("USE %s",
fullTypesMySql57Database.getDatabaseName()));
+ statement.execute(
+ "CREATE TABLE sharding_table_1("
+ + "id BIGINT,"
+ + "status BOOLEAN," // will be recognized as
tinyint(1) in debezium as
+ // it comes from show table command
+ + " PRIMARY KEY (id) "
+ + ")");
+ statement.execute("INSERT INTO sharding_table_1 values(1,
true),(2, false)");
+ }
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE sharding_tables (\n"
+ + "`id` BIGINT,"
+ + "status TINYINT,"
+ + "primary key (`id`) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'server-id' = '%s',"
+ + " 'server-time-zone' = 'UTC',"
+ + " 'scan.incremental.snapshot.chunk.size' =
'%s'"
+ + ")",
+ MYSQL_CONTAINER.getHost(),
+ MYSQL_CONTAINER.getDatabasePort(),
+ fullTypesMySql57Database.getUsername(),
+ fullTypesMySql57Database.getPassword(),
+ fullTypesMySql57Database.getDatabaseName(),
+ "sharding_table_.*",
+ incrementalSnapshot,
+ getServerId(),
+ getSplitSize());
+ String sinkDDL =
+ "CREATE TABLE sink ("
+ + " `id` BIGINT NOT NULL,"
+ + " status TINYINT,"
+ + " primary key (`id`) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM
sharding_tables");
+
+ try (Connection connection =
fullTypesMySql57Database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("INSERT INTO sharding_table_1 values(3,
true),(4, false)");
+ }
+ // wait for snapshot finished and begin binlog
+ waitForSinkSize("sink", 4);
+
+ try (Connection connection =
fullTypesMySql57Database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(String.format("USE %s",
fullTypesMySql57Database.getDatabaseName()));
+ statement.execute(
+ "CREATE TABLE sharding_table_2("
+ + "id BIGINT,"
+ + "status BOOLEAN," // will be recognized as
boolean in debezium as it
+ // comes from binlog
+ + " PRIMARY KEY (id) "
+ + ")");
+ statement.execute("INSERT INTO sharding_table_2 values(5,
true),(6, false)");
+ }
+
+ waitForSinkSize("sink", 6);
+ String[] expected =
+ new String[] {
+ "+I[1, 1]", "+I[2, 0]", "+I[3, 1]", "+I[4, 0]", "+I[5,
1]", "+I[6, 0]",
+ };
+ List<String> actual = TestValuesTableFactory.getResults("sink");
+ assertEqualsInAnyOrder(Arrays.asList(expected), actual);
+ result.getJobClient().get().cancel().get();
+ }
+
+ @Test
+ public void testShardingTablesWithInconsistentSchema() throws Exception {
+ userDatabase1.createAndInitialize();
+ userDatabase2.createAndInitialize();
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE `user` ("
+ + " `id` DECIMAL(20, 0) NOT NULL,"
+ + " name STRING,"
+ + " address STRING,"
+ + " phone_number STRING,"
+ + " email STRING,"
+ + " age INT,"
+ + " primary key (`id`) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'server-time-zone' = 'UTC',"
+ + " 'server-id' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' =
'%s'"
+ + ")",
+ MYSQL_CONTAINER.getHost(),
+ MYSQL_CONTAINER.getDatabasePort(),
+ userDatabase1.getUsername(),
+ userDatabase1.getPassword(),
+ String.format(
+ "(%s|%s)",
+ userDatabase1.getDatabaseName(),
userDatabase2.getDatabaseName()),
+ "user_table_.*",
+ incrementalSnapshot,
+ getServerId(),
+ getSplitSize());
+ tEnv.executeSql(sourceDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("SELECT * FROM `user`");
+
+ CloseableIterator<Row> iterator = result.collect();
+ waitForSnapshotStarted(iterator);
+
+ try (Connection connection = userDatabase1.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("UPDATE user_table_1_1 SET email =
'[email protected]' WHERE id=111;");
+ }
+
+ try (Connection connection = userDatabase2.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("UPDATE user_table_2_2 SET age = 20 WHERE
id=221;");
+ }
+
+ String[] expected =
+ new String[] {
+ "+I[111, user_111, Shanghai, 123567891234,
[email protected], null]",
+ "-U[111, user_111, Shanghai, 123567891234,
[email protected], null]",
+ "+U[111, user_111, Shanghai, 123567891234,
[email protected], null]",
+ "+I[121, user_121, Shanghai, 123567891234, null, null]",
+ "+I[211, user_211, Shanghai, 123567891234, null, null]",
+ "+I[221, user_221, Shanghai, 123567891234, null, 18]",
+ "-U[221, user_221, Shanghai, 123567891234, null, 18]",
+ "+U[221, user_221, Shanghai, 123567891234, null, 20]",
+ };
+
+ assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator,
expected.length));
+ result.getJobClient().get().cancel().get();
+ }
+
+ //
------------------------------------------------------------------------------------
+
+ private String getServerId() {
+ final Random random = new Random();
+ int serverId = random.nextInt(100) + 5400;
+ if (incrementalSnapshot) {
+ return serverId + "-" + (serverId + env.getParallelism());
+ }
+ return String.valueOf(serverId);
+ }
+
+ protected String getServerId(int base) {
+ if (incrementalSnapshot) {
+ return base + "-" + (base + DEFAULT_PARALLELISM);
+ }
+ return String.valueOf(base);
+ }
+
+ private int getSplitSize() {
+ if (incrementalSnapshot) {
+ // test parallel read
+ return 4;
+ }
+ return 0;
+ }
+
+ private static String buildColumnsDDL(
+ String columnPrefix, int start, int end, String dataType) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = start; i < end; i++) {
+ stringBuilder.append(columnPrefix).append(i).append("
").append(dataType).append(",");
+ }
+ return stringBuilder.toString();
+ }
+
+ private static String getIntegerSeqString(int start, int end) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = start; i < end - 1; i++) {
+ stringBuilder.append(i).append(", ");
+ }
+ stringBuilder.append(end - 1);
+ return stringBuilder.toString();
+ }
+
+ private static void waitForSnapshotStarted(String sinkName) throws
InterruptedException {
+ while (sinkSize(sinkName) == 0) {
+ Thread.sleep(100);
+ }
+ }
+
+ private static void waitForSinkSize(String sinkName, int expectedSize)
+ throws InterruptedException {
+ while (sinkSize(sinkName) < expectedSize) {
+ Thread.sleep(100);
+ }
+ }
+
+ private static int sinkSize(String sinkName) {
+ synchronized (TestValuesTableFactory.class) {
+ try {
+ return TestValuesTableFactory.getRawResults(sinkName).size();
+ } catch (IllegalArgumentException e) {
+ // job is not started yet
+ return 0;
+ }
+ }
+ }
+
+ private static List<String> fetchRows(Iterator<Row> iter, int size) {
+ List<String> rows = new ArrayList<>(size);
+ while (size > 0 && iter.hasNext()) {
+ Row row = iter.next();
+ rows.add(row.toString());
+ size--;
+ }
+ return rows;
+ }
+
+ private static void waitForSnapshotStarted(CloseableIterator<Row>
iterator) throws Exception {
+ while (!iterator.hasNext()) {
+ Thread.sleep(100);
+ }
+ }
+}