This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ceecd160 [flink][cdc] add option to decide convert mysql tinyint(1)
to bool or not (#1476)
3ceecd160 is described below
commit 3ceecd16029fa47a25bbed6d927e0de53505fb53
Author: legendtkl <[email protected]>
AuthorDate: Tue Jul 18 10:06:07 2023 +0800
[flink][cdc] add option to decide convert mysql tinyint(1) to bool or not
(#1476)
---
.../shortcodes/generated/mysql_sync_database.html | 2 +-
.../shortcodes/generated/mysql_sync_table.html | 2 +-
.../paimon/tests/cdc/MySqlCdcE2eTestBase.java | 186 +++++++++++++++++++++
.../tests/cdc/MySqlTinyIntConvertE2ETest.java | 105 ++++++++++++
.../src/test/resources/mysql/setup.sql | 60 +++++++
.../flink/action/cdc/mysql/MySqlActionUtils.java | 30 +++-
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 23 ++-
.../paimon/flink/action/cdc/mysql/MySqlSchema.java | 20 ++-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 15 +-
.../action/cdc/mysql/MySqlSyncTableAction.java | 13 +-
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 6 +-
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 22 ++-
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 157 +++++++++++++++++
.../cdc/mysql/MySqlSyncTableActionITCase.java | 140 ++++++++++++++++
.../src/test/resources/mysql/setup.sql | 44 +++++
15 files changed, 801 insertions(+), 24 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index cf6ededc0..8336fe6c4 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -59,7 +59,7 @@ under the License.
</tr>
<tr>
<td><h5>--mysql-conf</h5></td>
- <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
+ <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations. <br> Furthermore, TINYINT(1) type in
MySQL would be converted to Boolean b [...]
</tr>
<tr>
<td><h5>--catalog-conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index 15cb8da72..b9b286f9f 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -51,7 +51,7 @@ under the License.
</tr>
<tr>
<td><h5>--mysql-conf</h5></td>
- <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
+ <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations. <br> Furthermore, TINYINT(1) type in
MySQL would be converted to Boolean b [...]
</tr>
<tr>
<td><h5>--catalog-conf</h5></td>
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index 3b6cbad41..c464f1fcf 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -304,6 +304,192 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
cancelJob(jobId);
}
+ @Test
+ public void testSyncTableWithTinyConvert() throws Exception {
+ String runActionCommand =
+ String.join(
+ " ",
+ "bin/flink",
+ "run",
+ "-D",
+ "execution.checkpointing.interval=1s",
+ "--detached",
+ "lib/paimon-flink-action.jar",
+ "mysql-sync-table",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "ts_table",
+ "--partition-keys",
+ "pt",
+ "--primary-keys",
+ "pt,_id",
+ "--mysql-conf",
+ "hostname=mysql-1",
+ "--mysql-conf",
+ String.format("port=%d", MySqlContainer.MYSQL_PORT),
+ "--mysql-conf",
+ String.format("username='%s'",
mySqlContainer.getUsername()),
+ "--mysql-conf",
+ String.format("password='%s'",
mySqlContainer.getPassword()),
+ "--mysql-conf",
+ "database-name='paimon_sync_table'",
+ "--mysql-conf",
+ "table-name='tinyint_schema_evolution_.+'",
+ "--mysql-conf",
+ "mysql.converter.tinyint1-to-bool='false'",
+ "--table-conf",
+ "bucket=2");
+ Container.ExecResult execResult =
+ jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ try (Connection conn = getMySqlConnection();
+ Statement statement = conn.createStatement()) {
+ testSyncTableImplWithTinyConvert(statement);
+ }
+ }
+
+ private void testSyncTableImplWithTinyConvert(Statement statement) throws
Exception {
+ statement.executeUpdate("USE paimon_sync_table");
+
+ statement.executeUpdate("INSERT INTO tinyint_schema_evolution_1 VALUES
(1, 1, 11)");
+ statement.executeUpdate(
+ "INSERT INTO tinyint_schema_evolution_2 VALUES (1, 2, 12), (2,
4, 24)");
+
+ String jobId =
+ runSql(
+ "INSERT INTO result1 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result1", "pt INT, _id INT,
_tinyint1 TINYINT"));
+ checkResult("1, 1, 11", "1, 2, 12", "2, 4, 24");
+ clearCurrentResults();
+ cancelJob(jobId);
+
+ statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_1 ADD
COLUMN v1 TINYINT(1)");
+ statement.executeUpdate(
+ "INSERT INTO tinyint_schema_evolution_1 VALUES (2, 3, 23, 30),
(1, 5, 15, 50)");
+ statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_2 ADD
COLUMN v1 TINYINT(1)");
+ statement.executeUpdate("INSERT INTO tinyint_schema_evolution_2 VALUES
(1, 6, 16, 60)");
+
+ jobId =
+ runSql(
+ "INSERT INTO result2 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink(
+ "result2", "pt INT, _id INT, _tinyint1
TINYINT, v1 TINYINT"));
+ checkResult(
+ "1, 1, 11, null",
+ "1, 2, 12, null",
+ "2, 3, 23, 30",
+ "2, 4, 24, null",
+ "1, 5, 15, 50",
+ "1, 6, 16, 60");
+ clearCurrentResults();
+ cancelJob(jobId);
+ }
+
+ @Test
+ public void testSyncDatabaseWithTinyConvert() throws Exception {
+ String runActionCommand =
+ String.join(
+ " ",
+ "bin/flink",
+ "run",
+ "-D",
+ "execution.checkpointing.interval=1s",
+ "--detached",
+ "lib/paimon-flink-action.jar",
+ "mysql-sync-database",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--mysql-conf",
+ "hostname=mysql-1",
+ "--mysql-conf",
+ String.format("port=%d", MySqlContainer.MYSQL_PORT),
+ "--mysql-conf",
+ String.format("username='%s'",
mySqlContainer.getUsername()),
+ "--mysql-conf",
+ String.format("password='%s'",
mySqlContainer.getPassword()),
+ "--mysql-conf",
+ "database-name='paimon_sync_database_tinyint'",
+ "--mysql-conf",
+ "mysql.converter.tinyint1-to-bool='false'",
+ "--table-conf",
+ "bucket=2");
+ jobManager.execInContainer("su", "flink", "-c", runActionCommand);
+
+ try (Connection conn = getMySqlConnection();
+ Statement statement = conn.createStatement()) {
+ testSyncDatabaseImplWithTinyConvert(statement);
+ }
+ }
+
+ private void testSyncDatabaseImplWithTinyConvert(Statement statement)
throws Exception {
+ statement.executeUpdate("USE paimon_sync_database_tinyint");
+
+ statement.executeUpdate("INSERT INTO t1 VALUES (1, 10)");
+ statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20)");
+
+ String jobId =
+ runSql(
+ "INSERT INTO result1 SELECT * FROM t1;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result1", "k INT, v INT"));
+ checkResult("1, 10");
+ clearCurrentResults();
+ cancelJob(jobId);
+
+ jobId =
+ runSql(
+ "INSERT INTO result2 SELECT * FROM t2;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result2", "k1 INT, k2 VARCHAR(10),
v1 INT"));
+ checkResult("2, two, 20");
+ clearCurrentResults();
+ cancelJob(jobId);
+
+ statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 TINYINT(1)");
+ statement.executeUpdate("INSERT INTO t1 VALUES (3, 30, 42)");
+ statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v2 TINYINT(1)");
+ statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 40)");
+
+ jobId =
+ runSql(
+ "INSERT INTO result3 SELECT * FROM t1;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result3", "k INT, v INT, v1
TINYINT"));
+ checkResult("1, 10, null", "3, 30, 42");
+ clearCurrentResults();
+ cancelJob(jobId);
+
+ jobId =
+ runSql(
+ "INSERT INTO result4 SELECT * FROM t2;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result4", "k1 INT, k2 VARCHAR(10),
v1 INT, v2 TINYINT"));
+ checkResult("2, two, 20, null", "4, four, 40, 40");
+ clearCurrentResults();
+ cancelJob(jobId);
+ }
+
protected Connection getMySqlConnection() throws Exception {
return DriverManager.getConnection(
String.format(
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
new file mode 100644
index 000000000..3ff120293
--- /dev/null
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+/** E2e test for MySql CDC type convert tinyint(1) to tinyint. */
+public class MySqlTinyIntConvertE2ETest extends MySqlCdcE2eTestBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlTinyIntConvertE2ETest.class);
+
+ protected MySqlTinyIntConvertE2ETest() {
+ super(MySqlVersion.V5_7);
+ }
+
+ @Test
+ public void testSyncTable() throws Exception {
+ String runActionCommand =
+ String.join(
+ " ",
+ "bin/flink",
+ "run",
+ "-D",
+ "execution.checkpointing.interval=1s",
+ "--detached",
+ "lib/paimon-flink-action.jar",
+ "mysql-sync-table",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "tinyint_table",
+ "--primary-keys",
+ "pk",
+ "--mysql-conf",
+ "hostname=mysql-1",
+ "--mysql-conf",
+ String.format("port=%d", MySqlContainer.MYSQL_PORT),
+ "--mysql-conf",
+ String.format("username='%s'",
mySqlContainer.getUsername()),
+ "--mysql-conf",
+ String.format("password='%s'",
mySqlContainer.getPassword()),
+ "--mysql-conf",
+ "database-name='test_tinyint_convert'",
+ "--mysql-conf",
+ "table-name='T'",
+ "--mysql-conf",
+ "mysql.converter.tinyint1-to-bool='false'",
+ "--table-conf",
+ "bucket=2");
+ Container.ExecResult execResult =
+ jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ try (Connection conn = getMySqlConnection();
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate("USE test_tinyint_convert");
+
+ statement.executeUpdate("INSERT INTO T VALUES (1, '2023-05-10
12:30:20', 21)");
+
+ String jobId =
+ runSql(
+ "INSERT INTO result1 SELECT * FROM tinyint_table",
+ catalogDdl,
+ useCatalogCmd,
+ createResultSink(
+ "result1", "pk INT, _date TIMESTAMP(0),
_tinyint1 TINYINT"));
+ checkResult("1, 2023-05-10T12:30:20, 21");
+ clearCurrentResults();
+ cancelJob(jobId);
+ }
+ }
+
+ @Disabled("Not supported")
+ @Test
+ public void testSyncDatabase() {}
+}
diff --git a/paimon-e2e-tests/src/test/resources/mysql/setup.sql
b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
index cd5ae49c3..e7443cdc7 100644
--- a/paimon-e2e-tests/src/test/resources/mysql/setup.sql
+++ b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
@@ -41,6 +41,20 @@ CREATE TABLE schema_evolution_2 (
PRIMARY KEY (_id)
);
+CREATE TABLE tinyint_schema_evolution_1 (
+ pt INT,
+ _id INT,
+ _tinyint1 TINYINT(1),
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE tinyint_schema_evolution_2 (
+ pt INT,
+ _id INT,
+ _tinyint1 TINYINT(1),
+ PRIMARY KEY (_id)
+);
+
--
################################################################################
-- MySqlCdcE2eTestBase#testSyncDatabase
--
################################################################################
@@ -61,6 +75,22 @@ CREATE TABLE t2 (
PRIMARY KEY (k1, k2)
);
+CREATE DATABASE paimon_sync_database_tinyint;
+USE paimon_sync_database_tinyint;
+
+CREATE TABLE t1 (
+ k INT,
+ v INT,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+ k1 INT,
+ k2 VARCHAR(10),
+ v1 INT,
+ PRIMARY KEY (k1, k2)
+);
+
-- to make sure we use JDBC Driver correctly
CREATE DATABASE paimon_sync_database1;
USE paimon_sync_database1;
@@ -90,3 +120,33 @@ CREATE TABLE T (
_datetime DATETIME,
PRIMARY KEY (pk)
);
+
+--
################################################################################
+-- MySqlTinyIntConvertE2ETest#testSyncTable
+--
################################################################################
+
+CREATE DATABASE test_tinyint_convert;
+USE test_tinyint_convert;
+
+CREATE TABLE T (
+ pk INT,
+ _datetime DATETIME,
+ _tinyint1 TINYINT(1),
+ PRIMARY KEY (pk)
+);
+
+CREATE DATABASE paimon_sync_database_tinyint_schema;
+USE paimon_sync_database_tinyint_schema;
+
+CREATE TABLE schema_evolution_4 (
+ _id INT comment '_id',
+ v1 VARCHAR(10) comment 'v1',
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_5 (
+ _id INT comment '_id',
+ v1 VARCHAR(10) comment 'v1',
+ v2 TINYINT(1) comment 'tinyint(1)',
+ PRIMARY KEY (_id)
+);
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 15b157ab5..756b94279 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -56,7 +56,8 @@ import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-class MySqlActionUtils {
+/** Utils for MySQL Action. * */
+public class MySqlActionUtils {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlActionUtils.class);
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
@@ -66,12 +67,33 @@ class MySqlActionUtils {
.withDescription(
"Whether capture the scan the newly added tables
or not, by default is true.");
+ public static final ConfigOption<Boolean> MYSQL_CONVERTER_TINYINT1_BOOL =
+ ConfigOptions.key("mysql.converter.tinyint1-to-bool")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Mysql tinyint type will be converted to boolean
type by default, if you want to convert to tinyint type, "
+ + "you can set this option to false.");
+
static Connection getConnection(Configuration mySqlConfig) throws
Exception {
- return DriverManager.getConnection(
+ String url =
String.format(
- "jdbc:mysql://%s:%d/",
+ "jdbc:mysql://%s:%d",
mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
- mySqlConfig.get(MySqlSourceOptions.PORT)),
+ mySqlConfig.get(MySqlSourceOptions.PORT));
+
+ // we need to add the `tinyInt1isBit` parameter to the connection url
to make sure the
+ // tinyint(1) in MySQL is converted to bits or not. Refer to
+ //
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-result-sets.html#cj-conn-prop_tinyInt1isBit
+ if (mySqlConfig.contains(MYSQL_CONVERTER_TINYINT1_BOOL)) {
+ url =
+ String.format(
+ "%s?tinyInt1isBit=%s",
+ url,
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
+ }
+
+ return DriverManager.getConnection(
+ url,
mySqlConfig.get(MySqlSourceOptions.USERNAME),
mySqlConfig.get(MySqlSourceOptions.PASSWORD));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 71de9e336..e66ab96ae 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -67,34 +67,42 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ZoneId serverTimeZone;
private final boolean caseSensitive;
+
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
private final NewTableSchemaBuilder<String> schemaBuilder;
+ private final boolean convertTinyint1ToBool;
private JsonNode root;
private JsonNode payload;
public MySqlDebeziumJsonEventParser(
- ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn>
computedColumns) {
+ ZoneId serverTimeZone,
+ boolean caseSensitive,
+ List<ComputedColumn> computedColumns,
+ boolean convertTinyint1ToBool) {
this(
serverTimeZone,
caseSensitive,
computedColumns,
new TableNameConverter(caseSensitive),
- ddl -> Optional.empty());
+ ddl -> Optional.empty(),
+ convertTinyint1ToBool);
}
public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone,
boolean caseSensitive,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<String> schemaBuilder) {
+ NewTableSchemaBuilder<String> schemaBuilder,
+ boolean convertTinyint1ToBool) {
this(
serverTimeZone,
caseSensitive,
Collections.emptyList(),
tableNameConverter,
- schemaBuilder);
+ schemaBuilder,
+ convertTinyint1ToBool);
}
public MySqlDebeziumJsonEventParser(
@@ -102,12 +110,14 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<String> schemaBuilder) {
+ NewTableSchemaBuilder<String> schemaBuilder,
+ boolean convertTinyint1ToBool) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
this.tableNameConverter = tableNameConverter;
this.schemaBuilder = schemaBuilder;
+ this.convertTinyint1ToBool = convertTinyint1ToBool;
}
@Override
@@ -168,7 +178,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
MySqlTypeUtils.toDataType(
column.get("typeName").asText(),
length == null ? null : length.asInt(),
- scale == null ? null : scale.asInt());
+ scale == null ? null : scale.asInt(),
+ convertTinyint1ToBool);
if (column.get("optional").asBoolean()) {
type = type.nullable();
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 55ffeea1b..a60733e10 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -40,7 +40,22 @@ public class MySqlSchema {
private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
private final List<String> primaryKeys;
- public MySqlSchema(DatabaseMetaData metaData, String databaseName, String
tableName)
+ public MySqlSchema(
+ String databaseName,
+ String tableName,
+ LinkedHashMap<String, Tuple2<DataType, String>> fields,
+ List<String> primaryKeys) {
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.fields = fields;
+ this.primaryKeys = primaryKeys;
+ }
+
+ public MySqlSchema(
+ DatabaseMetaData metaData,
+ String databaseName,
+ String tableName,
+ boolean convertTinyintToBool)
throws Exception {
this.databaseName = databaseName;
this.tableName = tableName;
@@ -63,7 +78,8 @@ public class MySqlSchema {
fields.put(
fieldName,
Tuple2.of(
- MySqlTypeUtils.toDataType(fieldType,
precision, scale),
+ MySqlTypeUtils.toDataType(
+ fieldType, precision, scale,
convertTinyintToBool),
fieldComment));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 74df7cbac..3901d5c5e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
+import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -237,10 +238,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
MySqlTableSchemaBuilder schemaBuilder =
new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
+ Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
EventParser.Factory<String> parserFactory =
() ->
new MySqlDebeziumJsonEventParser(
- zoneId, caseSensitive, tableNameConverter,
schemaBuilder);
+ zoneId,
+ caseSensitive,
+ tableNameConverter,
+ schemaBuilder,
+ convertTinyint1ToBool);
String database = this.database;
DatabaseSyncMode mode = this.mode;
@@ -294,7 +300,12 @@ public class MySqlSyncDatabaseAction extends ActionBase {
excludedTables.add(tableName);
continue;
}
- MySqlSchema mySqlSchema = new MySqlSchema(metaData,
databaseName, tableName);
+ MySqlSchema mySqlSchema =
+ new MySqlSchema(
+ metaData,
+ databaseName,
+ tableName,
+
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
if (mySqlSchema.primaryKeys().size() > 0) {
// only tables with primary keys will be considered
mySqlSchemaList.add(mySqlSchema);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index b4e154086..fef0bdb72 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -46,6 +46,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -175,8 +176,11 @@ public class MySqlSyncTableAction extends ActionBase {
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
+ Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
EventParser.Factory<String> parserFactory =
- () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive,
computedColumns);
+ () ->
+ new MySqlDebeziumJsonEventParser(
+ zoneId, caseSensitive, computedColumns,
convertTinyint1ToBool);
CdcSinkBuilder<String> sinkBuilder =
new CdcSinkBuilder<String>()
@@ -239,7 +243,12 @@ public class MySqlSyncTableAction extends ActionBase {
Matcher tableMatcher =
tablePattern.matcher(tableName);
if (tableMatcher.matches()) {
mySqlSchemaList.add(
- new MySqlSchema(metaData,
databaseName, tableName));
+ new MySqlSchema(
+ metaData,
+ databaseName,
+ tableName,
+ mySqlConfig.get(
+
MYSQL_CONVERTER_TINYINT1_BOOL)));
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index fb6a47cbc..ee89a5b3d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
@@ -104,7 +105,10 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<String> {
name.getSimpleName(),
Tuple2.of(
MySqlTypeUtils.toDataType(
- column.getDataType().getName(),
precision, scale),
+ column.getDataType().getName(),
+ precision,
+ scale,
+
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()),
comment == null ? null :
String.valueOf(comment.getValue())));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 71fef4cd8..bd288cee5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -45,6 +45,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
+
/** Converts from MySQL type to {@link DataType}. */
public class MySqlTypeUtils {
@@ -139,11 +141,15 @@ public class MySqlTypeUtils {
return toDataType(
MySqlTypeUtils.getShortType(mysqlType),
MySqlTypeUtils.getPrecision(mysqlType),
- MySqlTypeUtils.getScale(mysqlType));
+ MySqlTypeUtils.getScale(mysqlType),
+ MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue());
}
public static DataType toDataType(
- String type, @Nullable Integer length, @Nullable Integer scale) {
+ String type,
+ @Nullable Integer length,
+ @Nullable Integer scale,
+ Boolean tinyInt1ToBool) {
switch (type.toUpperCase()) {
case BIT:
case BOOLEAN:
@@ -152,9 +158,15 @@ public class MySqlTypeUtils {
case TINYINT:
// MySQL haven't boolean type, it uses tinyint(1) to
represents boolean type
// user should not use tinyint(1) to store number although
jdbc url parameter
- // tinyInt1isBit=false can help change the return value, it's
not a general way
- // btw: mybatis and mysql-connector-java map tinyint(1) to
boolean by default
- return length != null && length == 1 ? DataTypes.BOOLEAN() :
DataTypes.TINYINT();
+ // tinyInt1isBit=false can help change the return value, it's
not a general way.
+ // mybatis and mysql-connector-java map tinyint(1) to boolean
by default, we behave
+ // the same way by default. To store number (-128~127), we can
set the parameter
+ // tinyInt1ToByte (option 'mysql.converter.tinyint1-to-bool')
to false, then
+ // tinyint(1)
+ // will be mapped to TinyInt.
+ return length != null && length == 1 && tinyInt1ToBool
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 07aaebfcc..635f2053a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -69,6 +69,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
private static final String DATABASE_NAME = "paimon_sync_database";
+
+ private static final String DATABASE_NAME_TINYINT_CONVERT =
+ "paimon_sync_database_tinyint_schema";
+
+ private static final String DATABASE_NAME_TINYINT =
"paimon_sync_database_tinyint";
@TempDir java.nio.file.Path tempDir;
@Test
@@ -219,6 +224,103 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table2, rowType2, primaryKeys2);
}
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionWithTinyInt1Convert() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME_TINYINT_CONVERT);
+ mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ Collections.emptyMap(),
+ tableConfig);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = conn.createStatement()) {
+ testSchemaEvolutionImplWithTinyInt1Convert(statement);
+ }
+ }
+ }
+
+ private void testSchemaEvolutionImplWithTinyInt1Convert(Statement
statement) throws Exception {
+ FileStoreTable table1 = getFileStoreTable("schema_evolution_4");
+ FileStoreTable table2 = getFileStoreTable("schema_evolution_5");
+
+ statement.executeUpdate("USE " + DATABASE_NAME_TINYINT_CONVERT);
+
+ statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (1,
'one')");
+ statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (2,
'two', 21)");
+ statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (3,
'three')");
+ statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (4,
'four', 24)");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"_id", "v1"});
+
+ List<String> primaryKeys1 = Arrays.asList("_id");
+ List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
DataTypes.TINYINT()
+ },
+ new String[] {"_id", "v1", "v2"});
+ List<String> primaryKeys2 = Arrays.asList("_id");
+ expected = Arrays.asList("+I[2, two, 21]", "+I[4, four, 24]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_4 ADD COLUMN v2
TINYINT(1)");
+ statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (5,
'five', 42)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_5 ADD COLUMN v3
TINYINT(1)");
+ statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (6,
'six', 42, 43)");
+
+ rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
DataTypes.TINYINT()
+ },
+ new String[] {"_id", "v1", "v2"});
+
+ expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]",
"+I[5, five, 42]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ DataTypes.TINYINT(),
+ DataTypes.TINYINT()
+ },
+ new String[] {"_id", "v1", "v2", "v3"});
+ expected =
+ Arrays.asList(
+ "+I[2, two, 21, NULL]", "+I[4, four, 24, NULL]",
"+I[6, six, 42, 43]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+ }
+
@Test
public void testSpecifiedMySqlTable() {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
@@ -937,6 +1039,61 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
return env.executeAsync();
}
+ @Test
+ @Timeout(60)
+ public void testTinyInt1Convert() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME_TINYINT);
+ mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ Collections.emptyMap(),
+ tableConfig);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = conn.createStatement()) {
+ testTinyInt1Convert(statement);
+ }
+ }
+ }
+
+ private void testTinyInt1Convert(Statement statement) throws Exception {
+ FileStoreTable table = getFileStoreTable("t4");
+
+ statement.executeUpdate("USE paimon_sync_database_tinyint");
+
+ statement.executeUpdate("INSERT INTO t4 VALUES (1, '2021-09-15
15:00:10', 21)");
+ statement.executeUpdate("INSERT INTO t4 VALUES (2, '2023-03-23
16:00:20', 42)");
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.TIMESTAMP(0),
DataTypes.TINYINT()
+ },
+ new String[] {"pk", "_datetime", "_tinyint1"});
+ List<String> expected =
+ Arrays.asList("+I[1, 2021-09-15T15:00:10, 21]", "+I[2,
2023-03-23T16:00:20, 42]");
+ waitForResult(expected, table, rowType, Arrays.asList("pk"));
+ }
+
private FileStoreTable getFileStoreTable(String tableName) throws
Exception {
Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
Identifier identifier = Identifier.create(database, tableName);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 8a608e674..033c0043b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -881,6 +881,146 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
}
}
+ @Test
+ @Timeout(60)
+ public void testTinyInt1Convert() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_tinyint1_convert");
+ mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.singletonList("pk"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement statement = conn.createStatement()) {
+ statement.execute("USE paimon_sync_table");
+ statement.executeUpdate(
+ "INSERT INTO test_tinyint1_convert VALUES (1, '2021-09-15
15:00:10', 21)");
+ statement.executeUpdate(
+ "INSERT INTO test_tinyint1_convert VALUES (2, '2023-03-23
16:00:20', 42)");
+
+ FileStoreTable table = getFileStoreTable();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.TIMESTAMP(0),
+ DataTypes.TINYINT()
+ },
+ new String[] {"pk", "_datetime", "_tinyint1"});
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, 2021-09-15T15:00:10, 21]", "+I[2,
2023-03-23T16:00:20, 42]");
+ waitForResult(expected, table, rowType, Arrays.asList("pk"));
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionWithTinyint1Convert() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "paimon_sync_table_tinyint");
+ mySqlConfig.put("table-name", "schema_evolution_3");
+ mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "_id"),
+ Collections.singletonMap(
+ CatalogOptions.METASTORE.key(),
"test-alter-table"),
+ tableConfig);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ checkTableSchema(
+ "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT
NOT
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = conn.createStatement()) {
+ testSchemaEvolutionImplWithTinyIntConvert(statement);
+ }
+ }
+ }
+
+ private void testSchemaEvolutionImplWithTinyIntConvert(Statement
statement) throws Exception {
+ FileStoreTable table = getFileStoreTable();
+ statement.executeUpdate("USE paimon_sync_table_tinyint");
+
+ statement.executeUpdate("INSERT INTO schema_evolution_3 VALUES (1, 1,
'one')");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_3 VALUES (1, 2, 'two'), (2, 4,
'four')");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10)
+ },
+ new String[] {"pt", "_id", "v1"});
+ List<String> primaryKeys = Arrays.asList("pt", "_id");
+ List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2,
two]", "+I[2, 4, four]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_3 ADD COLUMN v2
TINYINT(1)");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_3 VALUES (2, 3, 'three', 30),
(1, 5, 'five', 50)");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ DataTypes.TINYINT()
+ },
+ new String[] {"pt", "_id", "v1", "v2"});
+ expected =
+ Arrays.asList(
+ "+I[1, 1, one, NULL]",
+ "+I[1, 2, two, NULL]",
+ "+I[2, 3, three, 30]",
+ "+I[2, 4, four, NULL]",
+ "+I[1, 5, five, 50]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
private FileStoreTable getFileStoreTable() throws Exception {
Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
Identifier identifier = Identifier.create(database, tableName);
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index b4c877c40..5d9459273 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -274,6 +274,13 @@ CREATE TABLE test_computed_column (
PRIMARY KEY (pk)
);
+CREATE TABLE test_tinyint1_convert (
+ pk INT,
+ _datetime DATETIME,
+ _tinyint1 TINYINT(1),
+ PRIMARY KEY (pk)
+);
+
--
################################################################################
-- MySqlSyncDatabaseActionITCase
--
################################################################################
@@ -300,6 +307,17 @@ CREATE TABLE t3 (
v1 INT
);
+-- test tinyint(1) convert
+CREATE DATABASE paimon_sync_database_tinyint;
+USE paimon_sync_database_tinyint;
+
+CREATE TABLE t4 (
+ pk INT,
+ _datetime DATETIME,
+ _tinyint1 TINYINT(1),
+ PRIMARY KEY (pk)
+);
+
-- to make sure we use JDBC Driver correctly
CREATE DATABASE paimon_sync_database1;
USE paimon_sync_database1;
@@ -549,3 +567,29 @@ CREATE TABLE t2 (
v2 BIGINT,
PRIMARY KEY (k1, k2)
);
+
+CREATE DATABASE paimon_sync_table_tinyint;
+USE paimon_sync_table_tinyint;
+
+CREATE TABLE schema_evolution_3 (
+ pt INT comment 'primary',
+ _id INT comment '_id',
+ v1 VARCHAR(10) comment 'v1',
+ PRIMARY KEY (_id)
+);
+
+CREATE DATABASE paimon_sync_database_tinyint_schema;
+USE paimon_sync_database_tinyint_schema;
+
+CREATE TABLE schema_evolution_4 (
+ _id INT comment '_id',
+ v1 VARCHAR(10) comment 'v1',
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_5 (
+ _id INT comment '_id',
+ v1 VARCHAR(10) comment 'v1',
+ v2 TINYINT(1) comment 'tinyint(1)',
+ PRIMARY KEY (_id)
+);
\ No newline at end of file