This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8e0c85305 [flink][mysql-cdc] Introduce TypeMapping to specify data
type mapping rules (#1838)
8e0c85305 is described below
commit 8e0c853051ea884b5f19e9eab7922a37756ca6e3
Author: yuzelin <[email protected]>
AuthorDate: Mon Aug 21 18:20:32 2023 +0800
[flink][mysql-cdc] Introduce TypeMapping to specify data type mapping rules
(#1838)
---
docs/content/how-to/cdc-ingestion.md | 14 +-
.../shortcodes/generated/mysql_sync_database.html | 13 +
.../shortcodes/generated/mysql_sync_table.html | 13 +
.../org/apache/paimon/utils/DateTimeUtils.java | 2 +-
.../org/apache/paimon/utils/DateTimeUtilsTest.java | 45 +++
.../paimon/tests/cdc/MySqlCdcE2eTestBase.java | 25 +-
.../tests/cdc/MySqlComputedColumnE2ETest.java | 1 +
.../tests/cdc/MySqlTinyIntConvertE2ETest.java | 9 +-
.../paimon/flink/action/cdc/DatabaseSyncMode.java | 2 +-
.../paimon/flink/action/cdc/TypeMapping.java | 96 +++++
.../flink/action/cdc/mysql/MySqlActionUtils.java | 43 +--
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 25 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 17 +-
.../cdc/mysql/MySqlSyncDatabaseActionFactory.java | 18 +-
.../action/cdc/mysql/MySqlSyncTableAction.java | 14 +-
.../cdc/mysql/MySqlSyncTableActionFactory.java | 11 +
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 32 +-
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 34 +-
.../flink/action/cdc/mysql/schema/MySqlSchema.java | 16 +-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 1 -
.../cdc/mysql/MySqlCdcTypeMappingITCase.java | 430 +++++++++++++++++++++
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 116 +-----
.../cdc/mysql/MySqlSyncTableActionITCase.java | 52 ---
.../test/resources/mysql/sync_database_setup.sql | 27 --
.../src/test/resources/mysql/sync_table_setup.sql | 10 -
...table_setup.sql => type_mapping_test_setup.sql} | 115 ++----
26 files changed, 784 insertions(+), 397 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index 84b0203a1..10d2211f0 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -64,6 +64,7 @@ To use this feature through `flink run`, run the following
shell command.
--table <table-name> \
[--partition-keys <partition-keys>] \
[--primary-keys <primary-keys>] \
+ [--type-mapping <option1,option2...>] \
[--computed-column <'column-name=expr-name(args[, ...])'>
[--computed-column ...]] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
@@ -148,6 +149,7 @@ To use this feature through `flink run`, run the following
shell command.
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
[--mode <sync-mode>] \
+ [--type-mapping <option1,option2...>] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf
<paimon-table-sink-conf> ...]]
@@ -596,11 +598,13 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be
ignored, `RENAME COLUMN` w
{{< generated/compute_column >}}
-## Special Data Type Conversions
-1. MySQL TINYINT(1) type will be converted to Boolean by default. If you want
to store number (-128~127) in it like MySQL,
-you can specify that `--mysql-conf mysql.converter.tinyint1-to-bool=false`,
then the column will be mapped to TINYINT in Paimon table.
-2. MySQL BIT(1) type will be converted to Boolean.
-3. When using Hive catalog, MySQL TIME type will be converted to STRING.
+## Special Data Type Mapping
+1. MySQL TINYINT(1) type will be mapped to Boolean by default. If you want to
store number (-128~127) in it like MySQL,
+you can specify type mapping option `tinyint1-not-bool`, then the column will
be mapped to TINYINT in Paimon table.
+2. You can use type mapping option `to-nullable` to ignore all NOT NULL
constraints (except primary keys).
+3. You can use type mapping option `to-string` to map all MySQL data type to
STRING.
+4. MySQL BIT(1) type will be mapped to Boolean.
+5. When using Hive catalog, MySQL TIME type will be mapped to STRING.
## FAQ
1. Chinese characters in records ingested from MySQL are garbled.
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index 0d07d824b..2ee73c010 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -61,6 +61,19 @@ under the License.
<td><h5>--mode</h5></td>
<td>It is used to specify synchronization mode.<br />Possible
values:<ul><li>"divided" (the default mode if you haven't specified one): start
a sink for each table, the synchronization of the new table requires restarting
the job.</li><li>"combined": start a single combined sink for all tables, the
new table will be automatically synchronized.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>--type-mapping</h5></td>
+ <td>It is used to specify how to map MySQL data type to Paimon
type.<br />
+ Supported options:
+ <ul>
+ <li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT
instead of BOOLEAN.</li>
+ <li>"to-nullable": ignores all NOT NULL constraints
(except for primary keys).
+ This is used to solve the problem that Flink cannot
accept the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x'
operation.
+ </li>
+ <li>"to-string": maps all MySQL types to STRING.</li>
+ </ul>
+ </td>
+ </tr>
<tr>
<td><h5>--mysql-conf</h5></td>
<td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index 15cb8da72..255e81c68 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -45,6 +45,19 @@ under the License.
<td><h5>--primary-keys</h5></td>
<td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".</td>
</tr>
+ <tr>
+ <td><h5>--type-mapping</h5></td>
+ <td>It is used to specify how to map MySQL data type to Paimon
type.<br />
+ Supported options:
+ <ul>
+ <li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT
instead of BOOLEAN.</li>
+ <li>"to-nullable": ignores all NOT NULL constraints (except
for primary keys).
+ This is used to solve the problem that Flink cannot accept
the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' operation.
+ </li>
+ <li>"to-string": maps all MySQL types to STRING.</li>
+ </ul>
+ </td>
+ </tr>
<tr>
<td><h5>--computed-column</h5></td>
<td>The definitions of computed columns. The argument field is from
MySQL table field name. See <a href="#computed-functions">here</a> for a
complete list of configurations. </td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index 093613f95..a088129af 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -154,7 +154,7 @@ public class DateTimeUtils {
String nano = fraction.substring(0, precision);
if (nano.length() > 0) {
- ymdhms.append(".").append(fraction);
+ ymdhms.append(".").append(nano);
}
return ymdhms.toString();
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/DateTimeUtilsTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/DateTimeUtilsTest.java
new file mode 100644
index 000000000..e5c8facdc
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/DateTimeUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DateTimeUtils}. */
+public class DateTimeUtilsTest {
+
+ @Test
+ public void testFormatLocalDateTime() {
+ LocalDateTime time = LocalDateTime.of(2023, 8, 30, 12, 30, 59,
999_999_999);
+ String[] expectations = new String[10];
+ expectations[0] = "2023-08-30 12:30:59";
+ expectations[1] = "2023-08-30 12:30:59.9";
+ for (int i = 2; i <= 9; i++) {
+ expectations[i] = expectations[i - 1] + "9";
+ }
+
+ for (int precision = 0; precision <= 9; precision++) {
+ assertThat(DateTimeUtils.formatLocalDateTime(time, precision))
+ .isEqualTo(expectations[precision]);
+ }
+ }
+}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index aa4c83c94..d4c7158f0 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -34,7 +34,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -114,6 +114,7 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
ACTION_SYNC_TABLE,
"pt",
"pt,_id",
+ null,
ImmutableMap.of(),
ImmutableMap.of(
"database-name", "paimon_sync_table", "table-name",
"schema_evolution_.+"),
@@ -196,11 +197,11 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
@Test
public void testSyncDatabase() throws Exception {
-
runAction(
ACTION_SYNC_DATABASE,
null,
null,
+ null,
ImmutableMap.of(),
ImmutableMap.of("database-name", "paimon_sync_database"),
ImmutableMap.of("bucket", "2"));
@@ -273,6 +274,7 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
ACTION_SYNC_TABLE,
"pt",
"pt,_id",
+ "tinyint1-not-bool",
ImmutableMap.of(),
ImmutableMap.of(
"database-name",
@@ -338,6 +340,7 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
ACTION_SYNC_DATABASE,
null,
null,
+ "tinyint1-not-bool",
ImmutableMap.of(),
ImmutableMap.of(
"database-name",
@@ -426,18 +429,23 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
}
protected void runAction(
- @Nonnull String action,
- String partitionKeys,
- String primaryKeys,
- @Nonnull Map<String, String> computedColumn,
- @Nonnull Map<String, String> mysqlConf,
- @Nonnull Map<String, String> tableConf)
+ String action,
+ @Nullable String partitionKeys,
+ @Nullable String primaryKeys,
+ @Nullable String typeMappingOptions,
+ Map<String, String> computedColumn,
+ Map<String, String> mysqlConf,
+ Map<String, String> tableConf)
throws Exception {
String partitionKeysStr =
StringUtils.isBlank(partitionKeys) ? "" : "--partition-keys "
+ partitionKeys;
String primaryKeysStr =
StringUtils.isBlank(primaryKeys) ? "" : "--primary-keys " +
primaryKeys;
+ String typeMappingStr =
+ StringUtils.isBlank(typeMappingOptions)
+ ? ""
+ : "--type-mapping " + typeMappingOptions;
String tableStr = action.equals(ACTION_SYNC_TABLE) ? "--table
ts_table" : "";
List<String> computedColumns =
@@ -475,6 +483,7 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
tableStr,
partitionKeysStr,
primaryKeysStr,
+ typeMappingStr,
"--mysql-conf",
"hostname=mysql-1",
"--mysql-conf",
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
index 1f4dc3f27..054dfe47a 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
@@ -41,6 +41,7 @@ public class MySqlComputedColumnE2ETest extends
MySqlCdcE2eTestBase {
ACTION_SYNC_TABLE,
"_year",
"pk,_year",
+ null,
ImmutableMap.of("_year", "'year(_datetime)'"),
ImmutableMap.of("database-name", "'test_computed_column'",
"table-name", "'T'"),
ImmutableMap.of("bucket", "2"));
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
index 489de4c21..2c5f156b5 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -41,14 +41,9 @@ public class MySqlTinyIntConvertE2ETest extends
MySqlCdcE2eTestBase {
ACTION_SYNC_TABLE,
null,
"pk",
+ "tinyint1-not-bool",
ImmutableMap.of(),
- ImmutableMap.of(
- "database-name",
- "test_tinyint_convert",
- "table-name",
- "'T'",
- "mysql.converter.tinyint1-to-bool",
- "false"),
+ ImmutableMap.of("database-name", "test_tinyint_convert",
"table-name", "'T'"),
ImmutableMap.of("bucket", "2"));
try (Connection conn = getMySqlConnection();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
index 594ab06a4..e8b90947f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
@@ -46,7 +46,7 @@ public enum DatabaseSyncMode implements Serializable {
case "combined":
return COMBINED;
default:
- throw new UnsupportedOperationException("Unsupported sink
mode: " + mode);
+ throw new UnsupportedOperationException("Unsupported mode: " +
mode);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
new file mode 100644
index 000000000..1066f001c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+
+/** Utility that holds data type mapping options. */
+public class TypeMapping implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Set<TypeMappingMode> typeMappingModes;
+
+ public TypeMapping(Set<TypeMappingMode> typeMappingModes) {
+ this.typeMappingModes = typeMappingModes;
+ }
+
+ public boolean containsMode(TypeMappingMode mode) {
+ return typeMappingModes.contains(mode);
+ }
+
+ public static TypeMapping defaultMapping() {
+ return new TypeMapping(Collections.emptySet());
+ }
+
+ public static TypeMapping parse(String[] rawOptions) {
+ List<String> options =
+ Arrays.stream(rawOptions)
+ .map(String::trim)
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+
+ Set<TypeMappingMode> typeMappingModes = new HashSet<>();
+
+ for (String option : options) {
+ switch (option.toLowerCase()) {
+ case "tinyint1-not-bool":
+ typeMappingModes.add(TINYINT1_NOT_BOOL);
+ break;
+ case "to-nullable":
+ typeMappingModes.add(TO_NULLABLE);
+ break;
+ case "to-string":
+ typeMappingModes.add(TO_STRING);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported type mapping option: " + option);
+ }
+ }
+
+ return new TypeMapping(typeMappingModes);
+ }
+
+ /**
+ * Describe how to map MySQL data type to Paimon data type. Currently,
three modes are
+ * supported:
+ *
+ * <ul>
+ * <li>TINYINT1_NOT_BOOL: maps MySQL TINYINT(1) to TINYINT instead of
BOOLEAN.
+ * <li>TO_NULLABLE: ignores all NOT NULL constraints (except for primary
keys).
+ * <li>TO_STRING: maps all MySQL types to STRING.
+ * </ul>
+ */
+ public enum TypeMappingMode {
+ TINYINT1_NOT_BOOL,
+ TO_NULLABLE,
+ TO_STRING
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 978d2eaa8..73e83a253 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchema;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
@@ -60,6 +61,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Utils for MySQL Action. * */
@@ -73,30 +75,17 @@ public class MySqlActionUtils {
.withDescription(
"Whether capture the scan the newly added tables
or not, by default is true.");
- public static final ConfigOption<Boolean> MYSQL_CONVERTER_TINYINT1_BOOL =
- ConfigOptions.key("mysql.converter.tinyint1-to-bool")
- .booleanType()
- .defaultValue(true)
- .withDescription(
- "Mysql tinyint type will be converted to boolean
type by default, if you want to convert to tinyint type, "
- + "you can set this option to false.");
-
- static Connection getConnection(Configuration mySqlConfig) throws
Exception {
+ static Connection getConnection(Configuration mySqlConfig, boolean
tinyint1NotBool)
+ throws Exception {
String url =
String.format(
- "jdbc:mysql://%s:%d",
+ "jdbc:mysql://%s:%d%s",
mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
- mySqlConfig.get(MySqlSourceOptions.PORT));
-
- // we need to add the `tinyInt1isBit` parameter to the connection url
to make sure the
- // tinyint(1) in MySQL is converted to bits or not. Refer to
- //
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-result-sets.html#cj-conn-prop_tinyInt1isBit
- if (mySqlConfig.contains(MYSQL_CONVERTER_TINYINT1_BOOL)) {
- url =
- String.format(
- "%s?tinyInt1isBit=%s",
- url,
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
- }
+ mySqlConfig.get(MySqlSourceOptions.PORT),
+ // we need to add the `tinyInt1isBit` parameter to the
connection url to
+ // make sure the tinyint(1) in MySQL is converted to
bits or not. Refer to
+ //
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-result-sets.html#cj-conn-prop_tinyInt1isBit
+ tinyint1NotBool ? "?tinyInt1isBit=false" : "");
return DriverManager.getConnection(
url,
@@ -107,12 +96,15 @@ public class MySqlActionUtils {
static MySqlSchemasInfo getMySqlTableInfos(
Configuration mySqlConfig,
Predicate<String> monitorTablePredication,
- List<Identifier> excludedTables)
+ List<Identifier> excludedTables,
+ TypeMapping typeMapping)
throws Exception {
Pattern databasePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
MySqlSchemasInfo mySqlSchemasInfo = new MySqlSchemasInfo();
- try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
+ try (Connection conn =
+ MySqlActionUtils.getConnection(
+ mySqlConfig,
typeMapping.containsMode(TINYINT1_NOT_BOOL))) {
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet schemas = metaData.getCatalogs()) {
while (schemas.next()) {
@@ -124,10 +116,7 @@ public class MySqlActionUtils {
String tableName =
tables.getString("TABLE_NAME");
MySqlSchema mySqlSchema =
MySqlSchema.buildSchema(
- metaData,
- databaseName,
- tableName,
-
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
+ metaData, databaseName,
tableName, typeMapping);
Identifier identifier =
Identifier.create(databaseName, tableName);
if (monitorTablePredication.test(tableName)) {
mySqlSchemasInfo.addSchema(identifier,
mySqlSchema);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 35fd888c4..7377050de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -26,6 +26,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
@@ -72,6 +73,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link EventParser} for MySQL Debezium JSON. */
@@ -89,7 +91,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Nullable private final Pattern excludingPattern;
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();
- private final boolean convertTinyint1ToBool;
+ private final TypeMapping typeMapping;
private JsonNode root;
private JsonNode payload;
@@ -101,7 +103,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
ZoneId serverTimeZone,
boolean caseSensitive,
List<ComputedColumn> computedColumns,
- boolean convertTinyint1ToBool) {
+ TypeMapping typeMapping) {
this(
serverTimeZone,
caseSensitive,
@@ -110,7 +112,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
ddl -> Optional.empty(),
null,
null,
- convertTinyint1ToBool);
+ typeMapping);
}
public MySqlDebeziumJsonEventParser(
@@ -120,7 +122,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
- boolean convertTinyint1ToBool) {
+ TypeMapping typeMapping) {
this(
serverTimeZone,
caseSensitive,
@@ -129,7 +131,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
schemaBuilder,
includingPattern,
excludingPattern,
- convertTinyint1ToBool);
+ typeMapping);
}
public MySqlDebeziumJsonEventParser(
@@ -140,7 +142,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
- boolean convertTinyint1ToBool) {
+ TypeMapping typeMapping) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
@@ -148,7 +150,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
this.schemaBuilder = schemaBuilder;
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
- this.convertTinyint1ToBool = convertTinyint1ToBool;
+ this.typeMapping = typeMapping;
}
@Override
@@ -212,11 +214,10 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
column.get("typeName").asText(),
length == null ? null : length.asInt(),
scale == null ? null : scale.asInt(),
- convertTinyint1ToBool);
- if (column.get("optional").asBoolean()) {
- type = type.nullable();
- } else {
- type = type.notNull();
+ typeMapping);
+
+ if (!typeMapping.containsMode(TO_NULLABLE)) {
+ type = type.copy(column.get("optional").asBoolean());
}
String fieldName = column.get("name").asText();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 2858c78ad..3386920c7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -57,7 +58,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
-import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -111,6 +111,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
private String includingTables = ".*";
@Nullable String excludingTables;
private DatabaseSyncMode mode = DIVIDED;
+ private TypeMapping typeMapping = TypeMapping.defaultMapping();
// for test purpose
private final List<Identifier> monitoredTables = new ArrayList<>();
@@ -177,6 +178,11 @@ public class MySqlSyncDatabaseAction extends ActionBase {
return this;
}
+ public MySqlSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) {
+ this.typeMapping = typeMapping;
+ return this;
+ }
+
public void build(StreamExecutionEnvironment env) throws Exception {
checkArgument(
!mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
@@ -198,7 +204,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
mySqlConfig,
tableName ->
shouldMonitorTable(tableName,
includingPattern, excludingPattern),
- excludedTables);
+ excludedTables,
+ typeMapping);
logNonPkTables(mySqlSchemasInfo.nonPkTables());
List<MySqlTableInfo> mySqlTableInfos =
mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
@@ -255,9 +262,9 @@ public class MySqlSyncDatabaseAction extends ActionBase {
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
- Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
+ TypeMapping typeMapping = this.typeMapping;
MySqlTableSchemaBuilder schemaBuilder =
- new MySqlTableSchemaBuilder(tableConfig, caseSensitive,
convertTinyint1ToBool);
+ new MySqlTableSchemaBuilder(tableConfig, caseSensitive,
typeMapping);
EventParser.Factory<String> parserFactory =
() ->
@@ -268,7 +275,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
schemaBuilder,
includingPattern,
excludingPattern,
- convertTinyint1ToBool);
+ typeMapping);
String database = this.database;
DatabaseSyncMode mode = this.mode;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index c320a3203..a7a092bbb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.java.utils.MultipleParameterTool;
@@ -40,15 +41,14 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
public Optional<Action> create(MultipleParameterTool params) {
checkRequiredArgument(params, "mysql-conf");
- MySqlSyncDatabaseAction mySqlSyncDatabaseAction =
+ MySqlSyncDatabaseAction action =
new MySqlSyncDatabaseAction(
getRequiredValue(params, "warehouse"),
getRequiredValue(params, "database"),
optionalConfigMap(params, "catalog-conf"),
optionalConfigMap(params, "mysql-conf"));
- mySqlSyncDatabaseAction
- .withTableConfig(optionalConfigMap(params, "table-conf"))
+ action.withTableConfig(optionalConfigMap(params, "table-conf"))
.ignoreIncompatible(Boolean.parseBoolean(params.get("ignore-incompatible")))
.mergeShards(
!params.has("merge-shards")
@@ -59,7 +59,12 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
.excludingTables(params.get("excluding-tables"))
.withMode(DatabaseSyncMode.fromString(params.get("mode")));
- return Optional.of(mySqlSyncDatabaseAction);
+ if (params.has("type-mapping")) {
+ String[] options = params.get("type-mapping").split(",");
+ action.withTypeMapping(TypeMapping.parse(options));
+ }
+
+ return Optional.of(action);
}
@Override
@@ -82,6 +87,7 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
+ "[--including-tables
<mysql-table-name|name-regular-expr>] "
+ "[--excluding-tables
<mysql-table-name|name-regular-expr>] "
+ "[--mode <sync-mode>] "
+ + "[--type-mapping <option1,option2...>] "
+ "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...]] "
+ "[--catalog-conf <paimon-catalog-conf>
[--catalog-conf <paimon-catalog-conf> ...]] "
+ "[--table-conf <paimon-table-sink-conf>
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -125,6 +131,10 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
" 2. 'combined': start a single combined sink for all tables,
the new table will be automatically synchronized.");
System.out.println();
+ System.out.println(
+ "--type-mapping is used to specify how to map MySQL type to
Paimon type. Please see the doc for usage.");
+ System.out.println();
+
System.out.println("MySQL CDC source conf syntax:");
System.out.println(" key=value");
System.out.println(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 4d3b0957c..f01e6fa70 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
@@ -50,7 +51,6 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
-import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -95,6 +95,7 @@ public class MySqlSyncTableAction extends ActionBase {
private Map<String, String> tableConfig = new HashMap<>();
private List<String> computedColumnArgs = new ArrayList<>();
+ private TypeMapping typeMapping = TypeMapping.defaultMapping();
public MySqlSyncTableAction(
String warehouse, String database, String table, Map<String,
String> mySqlConfig) {
@@ -141,6 +142,11 @@ public class MySqlSyncTableAction extends ActionBase {
return this;
}
+ public MySqlSyncTableAction withTypeMapping(TypeMapping typeMapping) {
+ this.typeMapping = typeMapping;
+ return this;
+ }
+
public void build(StreamExecutionEnvironment env) throws Exception {
checkArgument(
mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
@@ -155,7 +161,7 @@ public class MySqlSyncTableAction extends ActionBase {
MySqlSchemasInfo mySqlSchemasInfo =
MySqlActionUtils.getMySqlTableInfos(
- mySqlConfig, monitorTablePredication(), new
ArrayList<>());
+ mySqlConfig, monitorTablePredication(), new
ArrayList<>(), typeMapping);
validateMySqlTableInfos(mySqlSchemasInfo);
catalog.createDatabase(database, true);
@@ -201,11 +207,11 @@ public class MySqlSyncTableAction extends ActionBase {
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
- Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
+ TypeMapping typeMapping = this.typeMapping;
EventParser.Factory<String> parserFactory =
() ->
new MySqlDebeziumJsonEventParser(
- zoneId, caseSensitive, computedColumns,
convertTinyint1ToBool);
+ zoneId, caseSensitive, computedColumns,
typeMapping);
CdcSinkBuilder<String> sinkBuilder =
new CdcSinkBuilder<String>()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
index 98a824e87..dd5828b69 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
@@ -64,6 +65,11 @@ public class MySqlSyncTableActionFactory implements
ActionFactory {
new
ArrayList<>(params.getMultiParameter("computed-column")));
}
+ if (params.has("type-mapping")) {
+ String[] options = params.get("type-mapping").split(",");
+ action.withTypeMapping(TypeMapping.parse(options));
+ }
+
return Optional.of(action);
}
@@ -80,6 +86,7 @@ public class MySqlSyncTableActionFactory implements
ActionFactory {
+ "--table <table-name> "
+ "[--partition-keys <partition-keys>] "
+ "[--primary-keys <primary-keys>] "
+ + "[--type-mapping <option1,option2...>] "
+ "[--computed-column <'column-name=expr-name(args[,
...])'> [--computed-column ...]] "
+ "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...]] "
+ "[--catalog-conf <paimon-catalog-conf>
[--catalog-conf <paimon-catalog-conf> ...]] "
@@ -98,6 +105,10 @@ public class MySqlSyncTableActionFactory implements
ActionFactory {
System.out.println("Primary keys will be derived from MySQL tables if
not specified.");
System.out.println();
+ System.out.println(
+ "--type-mapping is used to specify how to map MySQL type to
Paimon type. Please see the doc for usage.");
+ System.out.println();
+
System.out.println("Please see doc for usage of --computed-column.");
System.out.println();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 57fd97223..3779d1101 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
@@ -32,6 +33,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
@@ -39,13 +41,13 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode>
private final Map<String, String> tableConfig;
private final boolean caseSensitive;
- private final boolean convertTinyint1ToBool;
+ private final TypeMapping typeMapping;
public MySqlTableSchemaBuilder(
- Map<String, String> tableConfig, boolean caseSensitive, boolean
convertTinyint1ToBool) {
+ Map<String, String> tableConfig, boolean caseSensitive,
TypeMapping typeMapping) {
this.tableConfig = tableConfig;
this.caseSensitive = caseSensitive;
- this.convertTinyint1ToBool = convertTinyint1ToBool;
+ this.typeMapping = typeMapping;
}
@Override
@@ -56,17 +58,21 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode>
LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();
for (JsonNode element : columns) {
- Integer precision = element.has("length") ?
element.get("length").asInt() : null;
- Integer scale = element.has("scale") ?
element.get("scale").asInt() : null;
- fields.put(
- element.get("name").asText(),
- // TODO : add table comment and column comment when we
upgrade flink cdc to 2.4
+ JsonNode length = element.get("length");
+ JsonNode scale = element.get("scale");
+ DataType dataType =
MySqlTypeUtils.toDataType(
- element.get("typeExpression").asText(),
- precision,
- scale,
- convertTinyint1ToBool)
- .copy(element.get("optional").asBoolean()));
+ element.get("typeExpression").asText(),
+ length == null ? null : length.asInt(),
+ scale == null ? null : scale.asInt(),
+ typeMapping);
+
+ if (!typeMapping.containsMode(TO_NULLABLE)) {
+ dataType.copy(element.get("optional").asBoolean());
+ }
+
+ // TODO : add table comment and column comment when we upgrade
flink cdc to 2.4
+ fields.put(element.get("name").asText(), dataType);
}
ArrayNode arrayNode = (ArrayNode)
jsonTable.get("primaryKeyColumnNames");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index f7b696a89..80ba982bc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -23,6 +23,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -45,7 +46,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
/** Converts from MySQL type to {@link DataType}. */
public class MySqlTypeUtils {
@@ -142,14 +144,26 @@ public class MySqlTypeUtils {
MySqlTypeUtils.getShortType(mysqlType),
MySqlTypeUtils.getPrecision(mysqlType),
MySqlTypeUtils.getScale(mysqlType),
- MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue());
+ false);
}
public static DataType toDataType(
String type,
@Nullable Integer length,
@Nullable Integer scale,
- Boolean tinyInt1ToBool) {
+ TypeMapping typeMapping) {
+ if (typeMapping.containsMode(TO_STRING)) {
+ return DataTypes.STRING();
+ } else {
+ return toDataType(type, length, scale,
typeMapping.containsMode(TINYINT1_NOT_BOOL));
+ }
+ }
+
+ private static DataType toDataType(
+ String type,
+ @Nullable Integer length,
+ @Nullable Integer scale,
+ boolean tinyInt1NotBool) {
switch (type.toUpperCase()) {
case BIT:
if (length == null || length == 1) {
@@ -161,15 +175,13 @@ public class MySqlTypeUtils {
case BOOL:
return DataTypes.BOOLEAN();
case TINYINT:
- // MySQL haven't boolean type, it uses tinyint(1) to
represents boolean type
- // user should not use tinyint(1) to store number although
jdbc url parameter
+ // MySQL haven't boolean type, it uses tinyint(1) to
represents boolean type.
+ // User should not use tinyint(1) to store number although
jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's
not a general way.
- // mybatis and mysql-connector-java map tinyint(1) to boolean
by default, we behave
- // the same way by default. To store number (-128~127), we can
set the parameter
- // tinyInt1ToByte (option 'mysql.converter.tinyint1-to-bool')
to false, then
- // tinyint(1)
- // will be mapped to TinyInt.
- return length != null && length == 1 && tinyInt1ToBool
+ // Mybatis and mysql-connector-java map tinyint(1) to boolean
by default, we behave
+ // the same way by default. To store number (-128~127), user
can set the type
+ // mapping option 'tinyint1-not-bool' then tinyint(1) will be
mapped to tinyint.
+ return length != null && length == 1 && !tinyInt1NotBool
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 3e4964aff..5ca2d877b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql.schema;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.types.DataType;
@@ -49,29 +50,28 @@ public class MySqlSchema {
DatabaseMetaData metaData,
String databaseName,
String tableName,
- boolean convertTinyintToBool)
+ TypeMapping typeMapping)
throws SQLException {
LinkedHashMap<String, Pair<DataType, String>> fields = new
LinkedHashMap<>();
try (ResultSet rs = metaData.getColumns(databaseName, null, tableName,
null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
String fieldType = rs.getString("TYPE_NAME");
- Integer precision = rs.getInt("COLUMN_SIZE");
String fieldComment = rs.getString("REMARKS");
+ Integer precision = rs.getInt("COLUMN_SIZE");
if (rs.wasNull()) {
precision = null;
}
+
Integer scale = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
scale = null;
}
- fields.put(
- fieldName,
- Pair.of(
- MySqlTypeUtils.toDataType(
- fieldType, precision, scale,
convertTinyintToBool),
- fieldComment));
+ DataType paimonType =
+ MySqlTypeUtils.toDataType(fieldType, precision, scale,
typeMapping);
+
+ fields.put(fieldName, Pair.of(paimonType, fieldComment));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 7684ddbbc..f34b42f2b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -166,7 +166,6 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
protected List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager, List<DataField> updatedDataFields) {
-
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
for (DataField oldField : oldRowType.getFields()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
new file mode 100644
index 000000000..26e1638d9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT test for {@link TypeMapping} in MySQL CDC. */
+public class MySqlCdcTypeMappingITCase extends MySqlActionITCaseBase {
+
+ @BeforeAll
+ public static void startContainers() {
+ MYSQL_CONTAINER.withSetupSQL("mysql/type_mapping_test_setup.sql");
+ start();
+ }
+
+ // ------------------------------------- tinyint1-not-bool
-------------------------------------
+
+ @Test
+ @Timeout(60)
+ public void testTinyInt1NotBool() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "tinyint1_not_bool_test");
+
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withMode(COMBINED)
+ .withTypeMapping(new
TypeMapping(Collections.singleton(TINYINT1_NOT_BOOL)));
+ runActionWithDefaultEnv(action);
+
+ // read old data
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.TINYINT()},
+ new String[] {"pk", "_tinyint1"});
+ waitForResult(
+ Collections.singletonList("+I[1, 1]"),
+ getFileStoreTable("t1"),
+ rowType1,
+ Collections.singletonList("pk"));
+
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate("USE tinyint1_not_bool_test");
+
+ // test schema evolution
+ statement.executeUpdate("ALTER TABLE t1 ADD COLUMN _new_tinyint1
TINYINT(1)");
+ statement.executeUpdate("INSERT INTO t1 VALUES (2, -128, 127)");
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
DataTypes.TINYINT(), DataTypes.TINYINT()
+ },
+ new String[] {"pk", "_tinyint1", "_new_tinyint1"});
+ waitForResult(
+ Arrays.asList("+I[1, 1, NULL]", "+I[2, -128, 127]"),
+ getFileStoreTable("t1"),
+ rowType2,
+ Collections.singletonList("pk"));
+
+ // test newly created table
+ statement.executeUpdate(
+ "CREATE TABLE t2 (pk INT, _tinyint1 TINYINT(1), PRIMARY
KEY (pk))");
+ statement.executeUpdate("INSERT INTO t2 VALUES (1, 1), (2, 127),
(3, -128)");
+
+ waitingTables("t2");
+ waitForResult(
+ Arrays.asList("+I[1, 1]", "+I[2, 127]", "+I[3, -128]"),
+ getFileStoreTable("t2"),
+ rowType1,
+ Collections.singletonList("pk"));
+ }
+ }
+
+ // --------------------------------------- all-to-string
---------------------------------------
+
+ // TODO: test BIT(n) after fix bit type
+ @Test
+ @Timeout(60)
+ public void testReadAllTypes() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "all_to_string_test");
+ mySqlConfig.put("table-name", "all_types_table");
+
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig)
+ .withPartitionKeys("pt")
+ .withPrimaryKeys("pt", "_id")
+ .withTypeMapping(new
TypeMapping(Collections.singleton(TO_STRING)));
+ runActionWithDefaultEnv(action);
+
+ int allTypeNums = 76;
+ DataType[] types =
+ IntStream.range(0, allTypeNums)
+ .mapToObj(i -> DataTypes.STRING())
+ .toArray(DataType[]::new);
+ types[0] = types[0].notNull();
+ types[1] = types[1].notNull();
+
+ RowType rowType =
+ RowType.of(
+ types,
+ new String[] {
+ "_id",
+ "pt",
+ "_bit1",
+ "_tinyint1",
+ "_boolean",
+ "_bool",
+ "_tinyint",
+ "_tinyint_unsigned",
+ "_tinyint_unsigned_zerofill",
+ "_smallint",
+ "_smallint_unsigned",
+ "_smallint_unsigned_zerofill",
+ "_mediumint",
+ "_mediumint_unsigned",
+ "_mediumint_unsigned_zerofill",
+ "_int",
+ "_int_unsigned",
+ "_int_unsigned_zerofill",
+ "_bigint",
+ "_bigint_unsigned",
+ "_bigint_unsigned_zerofill",
+ "_serial",
+ "_float",
+ "_float_unsigned",
+ "_float_unsigned_zerofill",
+ "_real",
+ "_real_unsigned",
+ "_real_unsigned_zerofill",
+ "_double",
+ "_double_unsigned",
+ "_double_unsigned_zerofill",
+ "_double_precision",
+ "_double_precision_unsigned",
+ "_double_precision_unsigned_zerofill",
+ "_numeric",
+ "_numeric_unsigned",
+ "_numeric_unsigned_zerofill",
+ "_fixed",
+ "_fixed_unsigned",
+ "_fixed_unsigned_zerofill",
+ "_decimal",
+ "_decimal_unsigned",
+ "_decimal_unsigned_zerofill",
+ "_date",
+ "_datetime",
+ "_datetime3",
+ "_datetime6",
+ "_datetime_p",
+ "_datetime_p2",
+ "_timestamp",
+ "_timestamp0",
+ "_char",
+ "_varchar",
+ "_tinytext",
+ "_text",
+ "_mediumtext",
+ "_longtext",
+ "_bin",
+ "_varbin",
+ "_tinyblob",
+ "_blob",
+ "_mediumblob",
+ "_longblob",
+ "_json",
+ "_enum",
+ "_year",
+ "_time",
+ "_point",
+ "_geometry",
+ "_linestring",
+ "_polygon",
+ "_multipoint",
+ "_multiline",
+ "_multipolygon",
+ "_geometrycollection",
+ "_set",
+ });
+
+ List<String> expected =
+ Arrays.asList(
+ "+I["
+ + "1, 1.1, "
+ + "true, "
+ + "1, 1, 0, 1, 2, 3, "
+ + "1000, 2000, 3000, "
+ + "100000, 200000, 300000, "
+ + "1000000, 2000000, 3000000, "
+ + "10000000000, 20000000000, 30000000000,
40000000000, "
+ + "1.5, 2.5, 3.5, "
+ + "1.000001, 2.000002, 3.000003, "
+ + "1.000011, 2.000022, 3.000033, "
+ + "1.000111, 2.000222, 3.000333, "
+ + "12345.11, 12345.22, 12345.33, "
+ + "1.2345678987654322E32,
1.2345678987654322E32, 1.2345678987654322E32, "
+ + "11111, 22222, 33333, "
+ + "2023-03-23, "
+ // display value of datetime is not affected
by timezone
+ + "2023-03-23 14:30:05.000, 2023-03-23
14:30:05.123, 2023-03-23 14:30:05.123456, "
+ + "2023-03-24 14:30:00.000, 2023-03-24
14:30:05.120, "
+ // display value of timestamp is affected by
timezone
+ // we store 2023-03-23T15:00:10.123456 in
UTC-8 system timezone
+ // and query this timestamp in UTC-5 MySQL
server timezone
+ // so the display value should increase by 3
hour
+ + "2023-03-23 18:00:10.123456, 2023-03-23
03:10:00.000000, "
+ + "Paimon, Apache Paimon, Apache Paimon MySQL
TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL
MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, "
+ + "bytes\u0000\u0000\u0000\u0000\u0000, more
bytes, TINYBLOB type test data, BLOB type test data, MEDIUMBLOB type test data,
LONGBLOB bytes test data, "
+ + "{\"a\": \"b\"}, "
+ + "value1, "
+ + "2023, "
+ + "10:13:23, "
+ +
"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, "
+ +
"{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0},
"
+ +
"{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, "
+ +
"{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0},
"
+ +
"{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, "
+ +
"{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0},
"
+ +
"{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0},
"
+ +
"{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0},
"
+ + "a,b"
+ + "]",
+ "+I["
+ + "2, 2.2, "
+ + "NULL, "
+ + "NULL, NULL, NULL, NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, 50000000000, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, "
+ + "NULL, NULL, "
+ + "NULL, NULL, NULL, NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, NULL, NULL, NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL, "
+ + "NULL"
+ + "]");
+
+ waitForResult(expected, getFileStoreTable(tableName), rowType,
Arrays.asList("pt", "_id"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionAndNewlyCreatedTable() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "all_to_string_test");
+
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .excludingTables("all_types_table")
+ .withMode(COMBINED)
+ .withTypeMapping(new
TypeMapping(Collections.singleton(TO_STRING)));
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate("USE all_to_string_test");
+
+ // test schema evolution
+ statement.executeUpdate("INSERT INTO schema_evolution_test VALUES
(1, 1)");
+ FileStoreTable table = getFileStoreTable("schema_evolution_test");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.STRING().notNull(),
DataTypes.STRING()},
+ new String[] {"pk", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, 1]"),
+ table,
+ rowType,
+ Collections.singletonList("pk"));
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_test MODIFY
COLUMN v1 BIGINT");
+ statement.executeUpdate("INSERT INTO schema_evolution_test VALUES
(2, 20000000000)");
+
+ waitForResult(
+ Arrays.asList("+I[1, 1]", "+I[2, 20000000000]"),
+ table,
+ rowType,
+ Collections.singletonList("pk"));
+
+ statement.executeUpdate(
+ "ALTER TABLE schema_evolution_test ADD COLUMN v2
VARBINARY(10)");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_test VALUES (3, 3,
'0123456789'), (4, 4, 'A')");
+
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
DataTypes.STRING(), DataTypes.STRING()
+ },
+ new String[] {"pk", "v1", "v2"});
+ waitForResult(
+ Arrays.asList(
+ "+I[1, 1, NULL]",
+ "+I[2, 20000000000, NULL]",
+ "+I[3, 3, 0123456789]",
+ "+I[4, 4, A]"),
+ table,
+ rowType,
+ Collections.singletonList("pk"));
+
+ // test newly created table
+ statement.executeUpdate(
+ "CREATE TABLE _new_table (pk INT, v VARBINARY(10), PRIMARY
KEY (pk))");
+ statement.executeUpdate("INSERT INTO _new_table VALUES (1,
'Paimon')");
+
+ waitingTables("_new_table");
+ waitForResult(
+ Collections.singletonList("+I[1, Paimon]"),
+ getFileStoreTable("_new_table"),
+ RowType.of(
+ new DataType[] {DataTypes.STRING().notNull(),
DataTypes.STRING()},
+ new String[] {"pk", "v"}),
+ Collections.singletonList("pk"));
+ }
+ }
+
+ // ------------------------------------- ignore-not-null
-------------------------------------
+ @Test
+ @Timeout(60)
+ public void testIgnoreNotNull() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "ignore_not_null_test");
+
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
+ .withMode(COMBINED)
+ .withTypeMapping(new
TypeMapping(Collections.singleton(TO_NULLABLE)));
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable("t1");
+ assertThat(table.rowType().getTypeAt(1).isNullable()).isTrue();
+
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate("USE ignore_not_null_test");
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"pk", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, A]"),
+ table,
+ rowType,
+ Collections.singletonList("pk"));
+
+ // test schema evolution
+ statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 INT NOT NULL
DEFAULT 100");
+ statement.executeUpdate("INSERT INTO t1 VALUES (2, 'B', 10)");
+
+ while (table.rowType().getFieldCount() != 3) {
+ table = table.copyWithLatestSchema();
+ Thread.sleep(1_000);
+ }
+ assertThat(table.rowType().getTypeAt(2).isNullable()).isTrue();
+
+ // Currently we haven't handle default value
+ waitForResult(
+ Arrays.asList("+I[1, A, NULL]", "+I[2, B, 10]"),
+ table,
+ table.rowType(),
+ Collections.singletonList("pk"));
+
+ // test newly created table
+ statement.executeUpdate(
+ "CREATE TABLE _new_table (pk INT, v INT NOT NULL, PRIMARY
KEY (pk))");
+
+ waitingTables("_new_table");
+
assertThat(getFileStoreTable("_new_table").rowType().getTypeAt(1).isNullable())
+ .isTrue();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index d3e5ea6d9..47ded57f8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -202,84 +202,6 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table2, rowType2, primaryKeys2);
}
- @Test
- @Timeout(60)
- public void testSchemaEvolutionWithTinyInt1Convert() throws Exception {
- Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name",
"paimon_sync_database_tinyint_schema");
- mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
- MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
- .withTableConfig(getBasicTableConfig());
- runActionWithDefaultEnv(action);
-
- try (Statement statement = getStatement()) {
- testSchemaEvolutionImplWithTinyInt1Convert(statement);
- }
- }
-
- private void testSchemaEvolutionImplWithTinyInt1Convert(Statement
statement) throws Exception {
- FileStoreTable table1 = getFileStoreTable("schema_evolution_4");
- FileStoreTable table2 = getFileStoreTable("schema_evolution_5");
-
- statement.executeUpdate("USE " +
"paimon_sync_database_tinyint_schema");
-
- statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (1,
'one')");
- statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (2,
'two', 21)");
- statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (3,
'three')");
- statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (4,
'four', 24)");
-
- RowType rowType1 =
- RowType.of(
- new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
- new String[] {"_id", "v1"});
-
- List<String> primaryKeys1 = Collections.singletonList("_id");
- List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
-
- waitForResult(expected, table1, rowType1, primaryKeys1);
-
- RowType rowType2 =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
DataTypes.TINYINT()
- },
- new String[] {"_id", "v1", "v2"});
- List<String> primaryKeys2 = Collections.singletonList("_id");
- expected = Arrays.asList("+I[2, two, 21]", "+I[4, four, 24]");
- waitForResult(expected, table2, rowType2, primaryKeys2);
-
- statement.executeUpdate("ALTER TABLE schema_evolution_4 ADD COLUMN v2
TINYINT(1)");
- statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (5,
'five', 42)");
- statement.executeUpdate("ALTER TABLE schema_evolution_5 ADD COLUMN v3
TINYINT(1)");
- statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (6,
'six', 42, 43)");
-
- rowType1 =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
DataTypes.TINYINT()
- },
- new String[] {"_id", "v1", "v2"});
-
- expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]",
"+I[5, five, 42]");
- waitForResult(expected, table1, rowType1, primaryKeys1);
-
- rowType2 =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
- DataTypes.VARCHAR(10),
- DataTypes.TINYINT(),
- DataTypes.TINYINT()
- },
- new String[] {"_id", "v1", "v2", "v3"});
- expected =
- Arrays.asList(
- "+I[2, two, 21, NULL]", "+I[4, four, 24, NULL]",
"+I[6, six, 42, 43]");
- waitForResult(expected, table2, rowType2, primaryKeys2);
- }
-
@Test
public void testSpecifiedMySqlTable() {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
@@ -642,7 +564,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
@Test
- @Timeout(60)
+ @Timeout(120)
public void testAddIgnoredTable() throws Exception {
String mySqlDatabase = "paimon_sync_database_add_ignored_table";
@@ -952,42 +874,6 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
return env.executeAsync();
}
- @Test
- @Timeout(60)
- public void testTinyInt1Convert() throws Exception {
- Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", "paimon_sync_database_tinyint");
- mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
- MySqlSyncDatabaseAction action =
- new MySqlSyncDatabaseAction(warehouse, database, mySqlConfig)
- .withTableConfig(getBasicTableConfig());
- runActionWithDefaultEnv(action);
-
- try (Statement statement = getStatement()) {
- testTinyInt1Convert(statement);
- }
- }
-
- private void testTinyInt1Convert(Statement statement) throws Exception {
- FileStoreTable table = getFileStoreTable("t4");
-
- statement.executeUpdate("USE paimon_sync_database_tinyint");
-
- statement.executeUpdate("INSERT INTO t4 VALUES (1, '2021-09-15
15:00:10', 21)");
- statement.executeUpdate("INSERT INTO t4 VALUES (2, '2023-03-23
16:00:20', 42)");
-
- RowType rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(), DataTypes.TIMESTAMP(0),
DataTypes.TINYINT()
- },
- new String[] {"pk", "_datetime", "_tinyint1"});
- List<String> expected =
- Arrays.asList("+I[1, 2021-09-15T15:00:10, 21]", "+I[2,
2023-03-23T16:00:20, 42]");
- waitForResult(expected, table, rowType,
Collections.singletonList("pk"));
- }
-
@Test
@Timeout(240)
public void testSyncManyTableWithLimitedMemory() throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 6100288d8..7f747b1b8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -794,58 +794,6 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table, rowType, Arrays.asList("pk",
"_year_date"));
}
- @Test
- @Timeout(60)
- public void testTinyInt1Convert() throws Exception {
- Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", DATABASE_NAME);
- mySqlConfig.put("table-name", "test_tinyint1_convert");
- mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
- MySqlSyncTableAction action =
- new MySqlSyncTableAction(warehouse, database, tableName,
mySqlConfig);
- runActionWithDefaultEnv(action);
-
- checkTableSchema(
- "[{\"id\":0,\"name\":\"pk\",\"type\":\"INT NOT
NULL\",\"description\":\"\"},"
- +
"{\"id\":1,\"name\":\"_tinyint1\",\"type\":\"TINYINT\",\"description\":\"\"}]");
-
- try (Statement statement = getStatement()) {
- statement.execute("USE " + DATABASE_NAME);
- statement.executeUpdate("INSERT INTO test_tinyint1_convert VALUES
(1, 21), (2, 42)");
-
- FileStoreTable table = getFileStoreTable();
- RowType rowType =
- RowType.of(
- new DataType[] {DataTypes.INT().notNull(),
DataTypes.TINYINT()},
- new String[] {"pk", "_tinyint1"});
- List<String> expected = Arrays.asList("+I[1, 21]", "+I[2, 42]");
- waitForResult(expected, table, rowType,
Collections.singletonList("pk"));
-
- // test schema evolution
- statement.executeUpdate(
- "ALTER TABLE test_tinyint1_convert ADD COLUMN
_new_tinyint1 TINYINT(1)");
- statement.executeUpdate(
- "INSERT INTO test_tinyint1_convert VALUES (3, 63, 1), (4,
127, -128)");
-
- rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
DataTypes.TINYINT(), DataTypes.TINYINT()
- },
- new String[] {"pk", "_tinyint1", "_new_tinyint1"});
- waitForResult(
- Arrays.asList(
- "+I[1, 21, NULL]",
- "+I[2, 42, NULL]",
- "+I[3, 63, 1]",
- "+I[4, 127, -128]"),
- table,
- rowType,
- Collections.singletonList("pk"));
- }
- }
-
@Test
@Timeout(60)
public void testSyncShards() throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index 7f91e9df6..5232f4771 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -46,17 +46,6 @@ CREATE TABLE t3 (
v1 INT
);
--- test tinyint(1) convert
-CREATE DATABASE paimon_sync_database_tinyint;
-USE paimon_sync_database_tinyint;
-
-CREATE TABLE t4 (
- pk INT,
- _datetime DATETIME,
- _tinyint1 TINYINT(1),
- PRIMARY KEY (pk)
-);
-
-- to make sure we use JDBC Driver correctly
CREATE DATABASE paimon_sync_database1;
USE paimon_sync_database1;
@@ -322,22 +311,6 @@ CREATE TABLE a (
PRIMARY KEY (k)
);
-CREATE DATABASE paimon_sync_database_tinyint_schema;
-USE paimon_sync_database_tinyint_schema;
-
-CREATE TABLE schema_evolution_4 (
- _id INT comment '_id',
- v1 VARCHAR(10) comment 'v1',
- PRIMARY KEY (_id)
-);
-
-CREATE TABLE schema_evolution_5 (
- _id INT comment '_id',
- v1 VARCHAR(10) comment 'v1',
- v2 TINYINT(1) comment 'tinyint(1)',
- PRIMARY KEY (_id)
-);
-
CREATE DATABASE many_table_sync_test;
USE many_table_sync_test;
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 4e8415ffc..a0b06595e 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -280,16 +280,6 @@ CREATE TABLE test_computed_column (
PRIMARY KEY (pk)
);
---
################################################################################
--- testTinyInt1Convert
---
################################################################################
-
-CREATE TABLE test_tinyint1_convert (
- pk INT,
- _tinyint1 TINYINT(1),
- PRIMARY KEY (pk)
-);
-
--
################################################################################
-- testSyncShard
--
################################################################################
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
similarity index 83%
copy from
paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
copy to
paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
index 4e8415ffc..baf76b9d5 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
@@ -21,40 +21,37 @@
GRANT ALL PRIVILEGES ON *.* TO 'paimonuser'@'%';
--
################################################################################
--- MySqlSyncTableActionITCase
+-- MySqlCdcTypeMappingITCase
--
################################################################################
-CREATE DATABASE paimon_sync_table;
-USE paimon_sync_table;
+--
################################################################################
+-- testTinyInt1NotBool
+--
################################################################################
-CREATE TABLE schema_evolution_1 (
- pt INT comment 'primary',
- _id INT comment '_id',
- v1 VARCHAR(10) comment 'v1',
- PRIMARY KEY (_id)
-);
+CREATE DATABASE tinyint1_not_bool_test;
+USE tinyint1_not_bool_test;
-CREATE TABLE schema_evolution_2 (
- pt INT comment 'primary',
- _id INT comment '_id',
- v1 VARCHAR(10) comment 'v1',
- PRIMARY KEY (_id)
+CREATE TABLE t1 (
+ pk INT,
+ _tinyint1 TINYINT(1),
+ PRIMARY KEY (pk)
);
-CREATE TABLE schema_evolution_multiple (
- _id INT comment 'primary',
- v1 VARCHAR(10) comment 'v1',
- v2 INT comment 'v2',
- v3 VARCHAR(10) comment 'v3',
- PRIMARY KEY (_id)
-);
+INSERT INTO t1 VALUES (1, 1);
+
+CREATE DATABASE all_to_string_test;
+USE all_to_string_test;
+
+--
################################################################################
+-- testReadAllTypes
+--
################################################################################
CREATE TABLE all_types_table (
_id INT,
pt DECIMAL(2, 1),
-- BIT
_bit1 BIT,
- _bit BIT(64),
+ -- _bit BIT(64), TODO
-- TINYINT
_tinyint1 TINYINT(1),
_boolean BOOLEAN,
@@ -152,11 +149,10 @@ CREATE TABLE all_types_table (
PRIMARY KEY (_id)
);
-
INSERT INTO all_types_table VALUES (
1, 1.1,
-- BIT
- 1, B'11111000111',
+ 1, -- B'11111000111', TODO
-- TINYINT
true, true, false, 1, 2, 3,
-- SMALLINT
@@ -212,7 +208,7 @@ INSERT INTO all_types_table VALUES (
'a,b'
), (
2, 2.2,
- NULL, NULL,
+ NULL,
NULL, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL,
NULL, NULL, NULL,
@@ -246,80 +242,27 @@ INSERT INTO all_types_table VALUES (
NULL
);
-CREATE TABLE incompatible_field_1 (
- _id INT,
- v1 DATETIME,
- PRIMARY KEY (_id)
-);
-
-CREATE TABLE incompatible_field_2 (
- _id INT,
- v1 INT,
- PRIMARY KEY (_id)
-);
-
-CREATE TABLE incompatible_pk_1 (
- a INT,
- b BIGINT,
- c VARCHAR(20),
- PRIMARY KEY (a, b)
-);
-
-CREATE TABLE incompatible_pk_2 (
- a INT,
- b BIGINT,
- c VARCHAR(20),
- PRIMARY KEY (a)
-);
-
-CREATE TABLE test_computed_column (
- pk INT,
- _date DATE,
- _datetime DATETIME,
- _timestamp TIMESTAMP,
- PRIMARY KEY (pk)
-);
-
--
################################################################################
--- testTinyInt1Convert
+-- testSchemaEvolutionAndNewlyCreatedTable
--
################################################################################
-CREATE TABLE test_tinyint1_convert (
+CREATE TABLE schema_evolution_test (
pk INT,
- _tinyint1 TINYINT(1),
+ v1 INT,
PRIMARY KEY (pk)
);
--
################################################################################
--- testSyncShard
+-- testIgnoreNotNull
--
################################################################################
-CREATE DATABASE shard_1;
-USE shard_1;
+CREATE DATABASE ignore_not_null_test;
+USE ignore_not_null_test;
CREATE TABLE t1 (
pk INT,
- _date VARCHAR(10),
+ v1 VARCHAR(10) NOT NULL,
PRIMARY KEY (pk)
);
-CREATE TABLE t2 (
- pk INT,
- _date VARCHAR(10),
- PRIMARY KEY (pk)
-);
-
-CREATE DATABASE shard_2;
-USE shard_2;
-
-CREATE TABLE t1 (
- pk INT,
- _date VARCHAR(10),
- PRIMARY KEY (pk)
-);
-
-CREATE TABLE t2 (
- pk INT,
- _date VARCHAR(10),
- PRIMARY KEY (pk)
-);
+INSERT INTO t1 VALUES (1, 'A');