This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a64c4afac [cdc] Add char-to-string TypeMapping (#2002)
a64c4afac is described below
commit a64c4afac58d5fc0368111dadf8292ad05f1ece9
Author: Kerwin <[email protected]>
AuthorDate: Thu Sep 14 17:01:02 2023 +0800
[cdc] Add char-to-string TypeMapping (#2002)
---
docs/content/how-to/cdc-ingestion.md | 7 ++-
.../shortcodes/generated/mysql_sync_database.html | 1 +
.../shortcodes/generated/mysql_sync_table.html | 1 +
.../paimon/flink/action/cdc/TypeMapping.java | 52 ++++++++----------
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 17 ++----
.../cdc/mysql/MySqlCdcTypeMappingITCase.java | 64 ++++++++++++++++++++++
.../resources/mysql/type_mapping_test_setup.sql | 15 +++++
7 files changed, 114 insertions(+), 43 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index f0ddbfa27..19766eb0a 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -687,9 +687,10 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be
ignored, `RENAME COLUMN` w
you can specify type mapping option `tinyint1-not-bool` (Use
`--type-mapping`), then the column will be mapped to TINYINT in Paimon table.
2. You can use type mapping option `to-nullable` (Use `--type-mapping`) to
ignore all NOT NULL constraints (except primary keys).
3. You can use type mapping option `to-string` (Use `--type-mapping`) to map
all MySQL data type to STRING.
-4. MySQL BIT(1) type will be mapped to Boolean.
-5. When using Hive catalog, MySQL TIME type will be mapped to STRING.
-6. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary
value is passed as bytes in binlog, so it
+4. You can use type mapping option `char-to-string` (Use `--type-mapping`) to
map MySQL CHAR(length)/VARCHAR(length) types to STRING.
+5. MySQL BIT(1) type will be mapped to Boolean.
+6. When using Hive catalog, MySQL TIME type will be mapped to STRING.
+7. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary
value is passed as bytes in binlog, so it
should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY
because it can retain the length information.
## FAQ
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index bd82e43c6..0838850fd 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -71,6 +71,7 @@ under the License.
This is used to solve the problem that Flink cannot
accept the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x'
operation.
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
+ <li>"char-to-string": maps MySQL
CHAR(length)/VARCHAR(length) types to STRING.</li>
</ul>
</td>
</tr>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index 0bc46e704..124802b17 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -55,6 +55,7 @@ under the License.
This is used to solve the problem that Flink cannot accept
the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' operation.
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
+ <li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length)
types to STRING.</li>
</ul>
</td>
</tr>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
index a5f4f3783..b6e22e74f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
@@ -21,15 +21,11 @@ package org.apache.paimon.flink.action.cdc;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
-import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
-import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
-
/** Utility that holds data type mapping options. */
public class TypeMapping implements Serializable {
@@ -50,31 +46,12 @@ public class TypeMapping implements Serializable {
}
public static TypeMapping parse(String[] rawOptions) {
- List<String> options =
+ Set<TypeMappingMode> typeMappingModes =
Arrays.stream(rawOptions)
.map(String::trim)
.map(String::toLowerCase)
- .collect(Collectors.toList());
-
- Set<TypeMappingMode> typeMappingModes = new HashSet<>();
-
- for (String option : options) {
- switch (option.toLowerCase()) {
- case "tinyint1-not-bool":
- typeMappingModes.add(TINYINT1_NOT_BOOL);
- break;
- case "to-nullable":
- typeMappingModes.add(TO_NULLABLE);
- break;
- case "to-string":
- typeMappingModes.add(TO_STRING);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported type mapping option: " + option);
- }
- }
-
+ .map(TypeMappingMode::mode)
+ .collect(Collectors.toSet());
return new TypeMapping(typeMappingModes);
}
@@ -86,12 +63,29 @@ public class TypeMapping implements Serializable {
* <li>TINYINT1_NOT_BOOL: maps MySQL TINYINT(1) to TINYINT instead of
BOOLEAN.
* <li>TO_NULLABLE: ignores all NOT NULL constraints (except for primary
keys).
* <li>TO_STRING: maps all MySQL types to STRING.
+ * <li>CHAR_TO_STRING: maps MySQL CHAR(length)/VARCHAR(length) types to
STRING.
* </ul>
*/
public enum TypeMappingMode {
TINYINT1_NOT_BOOL,
TO_NULLABLE,
- TO_STRING;
+ TO_STRING,
+ CHAR_TO_STRING;
+
+ private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS
=
+ Arrays.stream(TypeMappingMode.values())
+ .collect(
+ Collectors.toMap(
+ TypeMappingMode::configString,
Function.identity()));
+
+ public static TypeMappingMode mode(String option) {
+ TypeMappingMode typeMappingMode = TYPE_MAPPING_OPTIONS.get(option);
+ if (typeMappingMode == null) {
+ throw new UnsupportedOperationException(
+ "Unsupported type mapping option: " + option);
+ }
+ return typeMappingMode;
+ }
public String configString() {
return name().toLowerCase().replace("_", "-");
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index e75c4e05d..a9a3c9630 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.CHAR_TO_STRING;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
@@ -153,16 +154,8 @@ public class MySqlTypeUtils {
TypeMapping typeMapping) {
if (typeMapping.containsMode(TO_STRING)) {
return DataTypes.STRING();
- } else {
- return toDataType(type, length, scale,
typeMapping.containsMode(TINYINT1_NOT_BOOL));
}
- }
- private static DataType toDataType(
- String type,
- @Nullable Integer length,
- @Nullable Integer scale,
- boolean tinyInt1NotBool) {
switch (type.toUpperCase()) {
case BIT:
if (length == null || length == 1) {
@@ -180,7 +173,7 @@ public class MySqlTypeUtils {
// Mybatis and mysql-connector-java map tinyint(1) to boolean
by default, we behave
// the same way by default. To store number (-128~127), user
can set the type
// mapping option 'tinyint1-not-bool' then tinyint(1) will be
mapped to tinyint.
- return length != null && length == 1 && !tinyInt1NotBool
+ return length != null && length == 1 &&
!typeMapping.containsMode(TINYINT1_NOT_BOOL)
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
@@ -258,9 +251,11 @@ public class MySqlTypeUtils {
}
// because tidb ddl event does not contain field precision
case CHAR:
- return length == null || length == 0 ? DataTypes.STRING() :
DataTypes.CHAR(length);
+ return length == null || length == 0 ||
typeMapping.containsMode(CHAR_TO_STRING)
+ ? DataTypes.STRING()
+ : DataTypes.CHAR(length);
case VARCHAR:
- return length == null || length == 0
+ return length == null || length == 0 ||
typeMapping.containsMode(CHAR_TO_STRING)
? DataTypes.STRING()
: DataTypes.VARCHAR(length);
case TINYTEXT:
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
index f7a4dcf32..8f173dd8d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.stream.IntStream;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.CHAR_TO_STRING;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
@@ -432,4 +433,67 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
.isTrue();
}
}
+
+ // --------------------------------------- char-to-string
+ // ---------------------------------------
+
+ @Test
+ @Timeout(60)
+ public void testCharToString() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "char_to_string_test");
+
+ MySqlSyncDatabaseAction action =
+ syncDatabaseActionBuilder(mySqlConfig)
+ .withMode(COMBINED.configString())
+ .withTypeMappingModes(CHAR_TO_STRING.configString())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable("t1");
+
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate("USE char_to_string_test");
+
+ // test schema evolution
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.STRING()},
+ new String[] {"pk", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, 1]"),
+ table,
+ rowType,
+ Collections.singletonList("pk"));
+
+ statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 CHAR(1)");
+ statement.executeUpdate("INSERT INTO t1 VALUES (2, '2', 'A'), (3,
'3', 'B')");
+
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.STRING(),
DataTypes.STRING()
+ },
+ new String[] {"pk", "v1", "v2"});
+ waitForResult(
+ Arrays.asList("+I[1, 1, NULL]", "+I[2, 2, A]", "+I[3, 3,
B]"),
+ table,
+ rowType,
+ Collections.singletonList("pk"));
+
+ // test newly created table
+ statement.executeUpdate(
+ "CREATE TABLE _new_table (pk INT, v VARCHAR(10), PRIMARY
KEY (pk))");
+ statement.executeUpdate("INSERT INTO _new_table VALUES (1,
'Paimon')");
+
+ waitingTables("_new_table");
+ waitForResult(
+ Collections.singletonList("+I[1, Paimon]"),
+ getFileStoreTable("_new_table"),
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.STRING()},
+ new String[] {"pk", "v"}),
+ Collections.singletonList("pk"));
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/type_mapping_test_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/type_mapping_test_setup.sql
index 5b74751d3..471555442 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/type_mapping_test_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/type_mapping_test_setup.sql
@@ -267,3 +267,18 @@ CREATE TABLE t1 (
);
INSERT INTO t1 VALUES (1, 'A');
+
+--
################################################################################
+-- testCharToString
+--
################################################################################
+
+CREATE DATABASE char_to_string_test;
+USE char_to_string_test;
+
+CREATE TABLE t1 (
+ pk INT,
+ v1 VARCHAR(10) NOT NULL,
+ PRIMARY KEY (pk)
+);
+
+INSERT INTO t1 VALUES (1, '1');