This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b5180f3b2 [flink][kafka-cdc] Kafka Canal CDC supports data type to 
string mapping (#1883)
b5180f3b2 is described below

commit b5180f3b2eb5da219b20a4fc993ad4793d20c127
Author: yuzelin <[email protected]>
AuthorDate: Tue Aug 29 16:08:51 2023 +0800

    [flink][kafka-cdc] Kafka Canal CDC supports data type to string mapping 
(#1883)
---
 docs/content/how-to/cdc-ingestion.md               |  2 +
 .../shortcodes/generated/kafka_sync_database.html  |  4 ++
 .../shortcodes/generated/kafka_sync_table.html     |  4 ++
 .../paimon/flink/action/cdc/kafka/KafkaSchema.java |  8 ++-
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  | 72 ++++++++++++++++------
 .../cdc/kafka/KafkaSyncDatabaseActionFactory.java  | 47 +++++++-------
 .../action/cdc/kafka/KafkaSyncTableAction.java     | 61 +++++++++++++-----
 .../cdc/kafka/KafkaSyncTableActionFactory.java     | 57 ++++++++---------
 .../flink/action/cdc/kafka/formats/DataFormat.java |  4 +-
 .../action/cdc/kafka/formats/RecordParser.java     | 18 +++++-
 .../cdc/kafka/formats/RecordParserFactory.java     |  3 +
 .../cdc/kafka/formats/canal/CanalRecordParser.java | 13 ++--
 .../cdc/kafka/formats/ogg/OggRecordParser.java     |  6 +-
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  8 +--
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     | 10 +--
 .../action/cdc/kafka/KafkaActionITCaseBase.java    | 11 +---
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  | 36 +++++++++++
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 34 ++++++++++
 .../flink/action/cdc/kafka/KafkaSchemaITCase.java  |  4 +-
 .../kafka/canal/database/tostring/canal-data-1.txt | 20 ++++++
 .../kafka/canal/table/tostring/canal-data-1.txt    | 20 ++++++
 21 files changed, 321 insertions(+), 121 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 0e8d78457..188a331d9 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -322,6 +322,7 @@ To use this feature through `flink run`, run the following 
shell command.
     --table <table-name> \
     [--partition-keys <partition-keys>] \
     [--primary-keys <primary-keys>] \
+    [--type-mapping to-string] \
     [--computed-column <'column-name=expr-name(args[, ...])'> 
[--computed-column ...]] \
     [--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
@@ -370,6 +371,7 @@ To use this feature through `flink run`, run the following 
shell command.
     [--table-suffix <paimon-table-suffix>] \
     [--including-tables <table-name|name-regular-expr>] \
     [--excluding-tables <table-name|name-regular-expr>] \
+    [--type-mapping to-string] \
     [--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html 
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index ac86c6449..e5d5671bd 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -57,6 +57,10 @@ under the License.
         <td><h5>--excluding-tables</h5></td>
         <td>It is used to specify which source tables are not to be 
synchronized. The usage is same as "--including-tables". "--excluding-tables" 
has higher priority than "--including-tables" if you specified both.</td>
     </tr>
+    <tr>
+        <td><h5>--type-mapping</h5></td>
+        <td>It is used to specify how to map MySQL data type to Paimon type. 
Currently, only support option "to-string": maps all MySQL types to STRING.</td>
+    </tr>
     <tr>
         <td><h5>--kafka-conf</h5></td>
         <td>The configuration for Flink Kafka sources. Each configuration 
should be specified in the format `key=value`. `properties.bootstrap.servers`, 
`topic`, `properties.group.id`,  and `value.format` are required 
configurations, others are optional.See its <a 
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options";>document</a>
 for a complete list of configurations.</td>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html 
b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index 339732e38..a4e3e42df 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -45,6 +45,10 @@ under the License.
         <td><h5>--primary-keys</h5></td>
         <td>The primary keys for Paimon table. If there are multiple primary 
keys, connect them with comma, for example "buyer_id,seller_id".</td>
     </tr>
+    <tr>
+        <td><h5>--type-mapping</h5></td>
+        <td>It is used to specify how to map MySQL data type to Paimon type. 
Currently, only support option "to-string": maps all MySQL types to STRING.</td>
+    </tr>
     <tr>
         <td><h5>--computed-column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
Kafka topic's table field name. See <a href="#computed-functions">here</a> for 
a complete list of configurations. </td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 2e1a2f98e..0a5931955 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.types.DataType;
@@ -119,10 +120,12 @@ public class KafkaSchema {
      *
      * @param kafkaConfig The configuration for Kafka.
      * @param topic The topic to retrieve the schema for.
+     * @param typeMapping data type mapping options.
      * @return The Kafka schema for the topic.
      * @throws KafkaSchemaRetrievalException If unable to retrieve the schema 
after max retries.
      */
-    public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String 
topic)
+    public static KafkaSchema getKafkaSchema(
+            Configuration kafkaConfig, String topic, TypeMapping typeMapping)
             throws KafkaSchemaRetrievalException {
         KafkaConsumer<String, String> consumer = 
getKafkaEarliestConsumer(kafkaConfig, topic);
         int retry = 0;
@@ -130,7 +133,8 @@ public class KafkaSchema {
 
         DataFormat format = getDataFormat(kafkaConfig);
         RecordParser recordParser =
-                format.createParser(true, new TableNameConverter(true), 
Collections.emptyList());
+                format.createParser(
+                        true, new TableNameConverter(true), typeMapping, 
Collections.emptyList());
 
         while (true) {
             ConsumerRecords<String, String> consumerRecords =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 3c271a3ae..fcd879a77 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -40,6 +41,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Pattern;
 
@@ -82,32 +84,60 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  */
 public class KafkaSyncDatabaseAction extends ActionBase {
 
-    private final Configuration kafkaConfig;
     private final String database;
-    private final String tablePrefix;
-    private final String tableSuffix;
-    @Nullable private final Pattern includingPattern;
-    @Nullable private final Pattern excludingPattern;
-    private final Map<String, String> tableConfig;
+    private final Configuration kafkaConfig;
+
+    private Map<String, String> tableConfig = new HashMap<>();
+    private String tablePrefix = "";
+    private String tableSuffix = "";
+    private String includingTables = ".*";
+    @Nullable String excludingTables;
+    private TypeMapping typeMapping = TypeMapping.defaultMapping();
 
     public KafkaSyncDatabaseAction(
-            Map<String, String> kafkaConfig,
             String warehouse,
             String database,
-            @Nullable String tablePrefix,
-            @Nullable String tableSuffix,
-            @Nullable String includingTables,
-            @Nullable String excludingTables,
             Map<String, String> catalogConfig,
-            Map<String, String> tableConfig) {
+            Map<String, String> kafkaConfig) {
         super(warehouse, catalogConfig);
-        this.kafkaConfig = Configuration.fromMap(kafkaConfig);
         this.database = database;
-        this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
-        this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
-        this.includingPattern = includingTables == null ? null : 
Pattern.compile(includingTables);
-        this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        this.kafkaConfig = Configuration.fromMap(kafkaConfig);
+    }
+
+    public KafkaSyncDatabaseAction withTableConfig(Map<String, String> 
tableConfig) {
         this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public KafkaSyncDatabaseAction withTablePrefix(@Nullable String 
tablePrefix) {
+        if (tablePrefix != null) {
+            this.tablePrefix = tablePrefix;
+        }
+        return this;
+    }
+
+    public KafkaSyncDatabaseAction withTableSuffix(@Nullable String 
tableSuffix) {
+        if (tableSuffix != null) {
+            this.tableSuffix = tableSuffix;
+        }
+        return this;
+    }
+
+    public KafkaSyncDatabaseAction includingTables(@Nullable String 
includingTables) {
+        if (includingTables != null) {
+            this.includingTables = includingTables;
+        }
+        return this;
+    }
+
+    public KafkaSyncDatabaseAction excludingTables(@Nullable String 
excludingTables) {
+        this.excludingTables = excludingTables;
+        return this;
+    }
+
+    public KafkaSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) {
+        this.typeMapping = typeMapping;
+        return this;
     }
 
     public void build(StreamExecutionEnvironment env) throws Exception {
@@ -125,11 +155,13 @@ public class KafkaSyncDatabaseAction extends ActionBase {
 
         DataFormat format = DataFormat.getDataFormat(kafkaConfig);
         RecordParser recordParser =
-                format.createParser(caseSensitive, tableNameConverter, 
Collections.emptyList());
+                format.createParser(
+                        caseSensitive, tableNameConverter, typeMapping, 
Collections.emptyList());
         RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
                 new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
-        Pattern includingPattern = this.includingPattern;
-        Pattern excludingPattern = this.excludingPattern;
+        Pattern includingPattern = Pattern.compile(includingTables);
+        Pattern excludingPattern =
+                excludingTables == null ? null : 
Pattern.compile(excludingTables);
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
                 () ->
                         new RichCdcMultiplexRecordEventParser(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index 354b5405f..9a33e6f91 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -20,10 +20,10 @@ package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
-import java.util.Map;
 import java.util.Optional;
 
 /** Factory to create {@link KafkaSyncDatabaseAction}. */
@@ -38,31 +38,27 @@ public class KafkaSyncDatabaseActionFactory implements 
ActionFactory {
 
     @Override
     public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "warehouse");
-        checkRequiredArgument(params, "database");
         checkRequiredArgument(params, "kafka-conf");
 
-        String warehouse = params.get("warehouse");
-        String database = params.get("database");
-        String tablePrefix = params.get("table-prefix");
-        String tableSuffix = params.get("table-suffix");
-        String includingTables = params.get("including-tables");
-        String excludingTables = params.get("excluding-tables");
-
-        Map<String, String> kafkaConfigOption = optionalConfigMap(params, 
"kafka-conf");
-        Map<String, String> catalogConfigOption = optionalConfigMap(params, 
"catalog-conf");
-        Map<String, String> tableConfigOption = optionalConfigMap(params, 
"table-conf");
-        return Optional.of(
+        KafkaSyncDatabaseAction action =
                 new KafkaSyncDatabaseAction(
-                        kafkaConfigOption,
-                        warehouse,
-                        database,
-                        tablePrefix,
-                        tableSuffix,
-                        includingTables,
-                        excludingTables,
-                        catalogConfigOption,
-                        tableConfigOption));
+                        getRequiredValue(params, "warehouse"),
+                        getRequiredValue(params, "database"),
+                        optionalConfigMap(params, "catalog-conf"),
+                        optionalConfigMap(params, "kafka-conf"));
+
+        action.withTableConfig(optionalConfigMap(params, "table-conf"))
+                .withTablePrefix(params.get("table-prefix"))
+                .withTableSuffix(params.get("table-suffix"))
+                .includingTables(params.get("including-tables"))
+                .excludingTables(params.get("excluding-tables"));
+
+        if (params.has("type-mapping")) {
+            String[] options = params.get("type-mapping").split(",");
+            action.withTypeMapping(TypeMapping.parse(options));
+        }
+
+        return Optional.of(action);
     }
 
     @Override
@@ -81,6 +77,7 @@ public class KafkaSyncDatabaseActionFactory implements 
ActionFactory {
                         + "[--table-suffix <paimon-table-suffix>] "
                         + "[--including-tables <table-name|name-regular-expr>] 
"
                         + "[--excluding-tables <table-name|name-regular-expr>] 
"
+                        + "[--type-mapping <option1,option2...>] "
                         + "[--kafka-conf <kafka-source-conf> [--kafka-conf 
<kafka-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
                         + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -102,6 +99,10 @@ public class KafkaSyncDatabaseActionFactory implements 
ActionFactory {
                 "--excluding-tables has higher priority than 
--including-tables if you specified both.");
         System.out.println();
 
+        System.out.println(
+                "--type-mapping is used to specify how to map MySQL type to 
Paimon type. Please see the doc for usage.");
+        System.out.println();
+
         System.out.println("kafka source conf syntax:");
         System.out.println("  key=value");
         System.out.println(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index f55b3f670..2dffcfcab 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
@@ -41,6 +42,9 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -80,40 +84,66 @@ import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComput
  */
 public class KafkaSyncTableAction extends ActionBase {
 
-    private final Configuration kafkaConfig;
     private final String database;
     private final String table;
-    private final List<String> partitionKeys;
-    private final List<String> primaryKeys;
+    private final Configuration kafkaConfig;
 
-    private final List<String> computedColumnArgs;
+    private List<String> partitionKeys = new ArrayList<>();
+    private List<String> primaryKeys = new ArrayList<>();
 
-    private final Map<String, String> tableConfig;
+    private Map<String, String> tableConfig = new HashMap<>();
+    private List<String> computedColumnArgs = new ArrayList<>();
+    private TypeMapping typeMapping = TypeMapping.defaultMapping();
 
     public KafkaSyncTableAction(
-            Map<String, String> kafkaConfig,
             String warehouse,
             String database,
             String table,
-            List<String> partitionKeys,
-            List<String> primaryKeys,
-            List<String> computedColumnArgs,
             Map<String, String> catalogConfig,
-            Map<String, String> tableConfig) {
+            Map<String, String> kafkaConfig) {
         super(warehouse, catalogConfig);
-        this.kafkaConfig = Configuration.fromMap(kafkaConfig);
         this.database = database;
         this.table = table;
+        this.kafkaConfig = Configuration.fromMap(kafkaConfig);
+    }
+
+    public KafkaSyncTableAction withPartitionKeys(String... partitionKeys) {
+        return withPartitionKeys(Arrays.asList(partitionKeys));
+    }
+
+    public KafkaSyncTableAction withPartitionKeys(List<String> partitionKeys) {
         this.partitionKeys = partitionKeys;
+        return this;
+    }
+
+    public KafkaSyncTableAction withPrimaryKeys(String... primaryKeys) {
+        return withPrimaryKeys(Arrays.asList(primaryKeys));
+    }
+
+    public KafkaSyncTableAction withPrimaryKeys(List<String> primaryKeys) {
         this.primaryKeys = primaryKeys;
-        this.computedColumnArgs = computedColumnArgs;
+        return this;
+    }
+
+    public KafkaSyncTableAction withTableConfig(Map<String, String> 
tableConfig) {
         this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public KafkaSyncTableAction withComputedColumnArgs(List<String> 
computedColumnArgs) {
+        this.computedColumnArgs = computedColumnArgs;
+        return this;
+    }
+
+    public KafkaSyncTableAction withTypeMapping(TypeMapping typeMapping) {
+        this.typeMapping = typeMapping;
+        return this;
     }
 
     public void build(StreamExecutionEnvironment env) throws Exception {
         KafkaSource<String> source = 
KafkaActionUtils.buildKafkaSource(kafkaConfig);
         String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0);
-        KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig, 
topic);
+        KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig, 
topic, typeMapping);
 
         catalog.createDatabase(database, true);
         boolean caseSensitive = catalog.caseSensitive();
@@ -140,7 +170,10 @@ public class KafkaSyncTableAction extends ActionBase {
         DataFormat format = DataFormat.getDataFormat(kafkaConfig);
         RecordParser recordParser =
                 format.createParser(
-                        caseSensitive, new TableNameConverter(caseSensitive), 
computedColumns);
+                        caseSensitive,
+                        new TableNameConverter(caseSensitive),
+                        typeMapping,
+                        computedColumns);
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
                 RichCdcMultiplexRecordEventParser::new;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
index ad748713e..3af27df4c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
@@ -20,17 +20,13 @@ package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /** Factory to create {@link KafkaSyncTableAction}. */
 public class KafkaSyncTableActionFactory implements ActionFactory {
@@ -45,42 +41,36 @@ public class KafkaSyncTableActionFactory implements 
ActionFactory {
     @Override
     public Optional<Action> create(MultipleParameterTool params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
+        checkRequiredArgument(params, "kafka-conf");
+
+        KafkaSyncTableAction action =
+                new KafkaSyncTableAction(
+                                tablePath.f0,
+                                tablePath.f1,
+                                tablePath.f2,
+                                optionalConfigMap(params, "catalog-conf"),
+                                optionalConfigMap(params, "kafka-conf"))
+                        .withTableConfig(optionalConfigMap(params, 
"table-conf"));
 
-        List<String> partitionKeys = Collections.emptyList();
         if (params.has("partition-keys")) {
-            partitionKeys =
-                    Arrays.stream(params.get("partition-keys").split(","))
-                            .collect(Collectors.toList());
+            action.withPartitionKeys(params.get("partition-keys").split(","));
         }
 
-        List<String> primaryKeys = Collections.emptyList();
         if (params.has("primary-keys")) {
-            primaryKeys =
-                    Arrays.stream(params.get("primary-keys").split(","))
-                            .collect(Collectors.toList());
+            action.withPrimaryKeys(params.get("primary-keys").split(","));
         }
-        List<String> computedColumnArgs = Collections.emptyList();
+
         if (params.has("computed-column")) {
-            computedColumnArgs = new 
ArrayList<>(params.getMultiParameter("computed-column"));
+            action.withComputedColumnArgs(
+                    new 
ArrayList<>(params.getMultiParameter("computed-column")));
         }
 
-        checkRequiredArgument(params, "kafka-conf");
-
-        Map<String, String> kafkaConfig = optionalConfigMap(params, 
"kafka-conf");
-        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
-        Map<String, String> paimonConfig = optionalConfigMap(params, 
"table-conf");
+        if (params.has("type-mapping")) {
+            String[] options = params.get("type-mapping").split(",");
+            action.withTypeMapping(TypeMapping.parse(options));
+        }
 
-        return Optional.of(
-                new KafkaSyncTableAction(
-                        kafkaConfig,
-                        tablePath.f0,
-                        tablePath.f1,
-                        tablePath.f2,
-                        partitionKeys,
-                        primaryKeys,
-                        computedColumnArgs,
-                        catalogConfig,
-                        paimonConfig));
+        return Optional.of(action);
     }
 
     @Override
@@ -96,6 +86,7 @@ public class KafkaSyncTableActionFactory implements 
ActionFactory {
                         + "--table <table-name> "
                         + "[--partition-keys <partition-keys>] "
                         + "[--primary-keys <primary-keys>] "
+                        + "[--type-mapping <option1,option2...>] "
                         + "[--computed-column <'column-name=expr-name(args[, 
...])'> [--computed-column ...]] "
                         + "[--kafka-conf <kafka-source-conf> [--kafka-conf 
<kafka-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
@@ -114,6 +105,10 @@ public class KafkaSyncTableActionFactory implements 
ActionFactory {
         System.out.println("Primary keys will be derived from tables if not 
specified.");
         System.out.println();
 
+        System.out.println(
+                "--type-mapping is used to specify how to map MySQL type to 
Paimon type. Please see the doc for usage.");
+        System.out.println();
+
         System.out.println("Please see doc for usage of --computed-column.");
         System.out.println();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
index 4a95b88e9..155532900 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.formats;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import 
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
 import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;
 
@@ -58,8 +59,9 @@ public enum DataFormat {
     public RecordParser createParser(
             boolean caseSensitive,
             TableNameConverter tableNameConverter,
+            TypeMapping typeMapping,
             List<ComputedColumn> computedColumns) {
-        return parser.createParser(caseSensitive, tableNameConverter, 
computedColumns);
+        return parser.createParser(caseSensitive, typeMapping, 
tableNameConverter, computedColumns);
     }
 
     /**
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index 23de278b3..8527d93b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.flink.action.cdc.kafka.formats;
 
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.utils.StringUtils;
@@ -50,11 +52,15 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
     protected static final String FIELD_TABLE = "table";
     protected static final String FIELD_DATABASE = "database";
     protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     protected final TableNameConverter tableNameConverter;
+    protected final boolean caseSensitive;
+    protected final TypeMapping typeMapping;
+    protected final List<ComputedColumn> computedColumns;
+
     protected JsonNode root;
     protected String databaseName;
     protected String tableName;
-    protected boolean caseSensitive;
 
     protected abstract List<RichCdcMultiplexRecord> extractRecords();
 
@@ -64,9 +70,15 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
 
     public abstract KafkaSchema getKafkaSchema(String record);
 
-    public RecordParser(TableNameConverter tableNameConverter, boolean 
caseSensitive) {
-        this.tableNameConverter = tableNameConverter;
+    public RecordParser(
+            boolean caseSensitive,
+            TypeMapping typeMapping,
+            TableNameConverter tableNameConverter,
+            List<ComputedColumn> computedColumns) {
         this.caseSensitive = caseSensitive;
+        this.typeMapping = typeMapping;
+        this.tableNameConverter = tableNameConverter;
+        this.computedColumns = computedColumns;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
index ee4efe297..413ecbc4f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.formats;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import java.util.List;
 
@@ -38,12 +39,14 @@ public interface RecordParserFactory {
      * Creates a new instance of {@link RecordParser} with the specified 
configurations.
      *
      * @param caseSensitive Indicates whether the parser should be 
case-sensitive.
+     * @param typeMapping Data type mapping options.
      * @param tableNameConverter Converter to transform table names.
      * @param computedColumns List of computed columns to be considered by the 
parser.
      * @return A new instance of {@link RecordParser}.
      */
     RecordParser createParser(
             boolean caseSensitive,
+            TypeMapping typeMapping,
             TableNameConverter tableNameConverter,
             List<ComputedColumn> computedColumns);
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 17321214e..5f3b82c40 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -20,6 +20,7 @@ package 
org.apache.paimon.flink.action.cdc.kafka.formats.canal;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
@@ -71,14 +72,12 @@ public class CanalRecordParser extends RecordParser {
     private static final String OP_INSERT = "INSERT";
     private static final String OP_DELETE = "DELETE";
 
-    private final List<ComputedColumn> computedColumns;
-
     public CanalRecordParser(
             boolean caseSensitive,
+            TypeMapping typeMapping,
             TableNameConverter tableNameConverter,
             List<ComputedColumn> computedColumns) {
-        super(tableNameConverter, caseSensitive);
-        this.computedColumns = computedColumns;
+        super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
     }
 
     @Override
@@ -123,7 +122,8 @@ public class CanalRecordParser extends RecordParser {
         LinkedHashMap<String, String> mySqlFieldTypes = 
extractFieldTypesFromMySqlType();
         LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
         mySqlFieldTypes.forEach(
-                (name, type) -> paimonFieldTypes.put(name, 
MySqlTypeUtils.toDataType(type)));
+                (name, type) ->
+                        paimonFieldTypes.put(name, 
MySqlTypeUtils.toDataType(type, typeMapping)));
 
         return new KafkaSchema(
                 extractString(FIELD_DATABASE),
@@ -317,7 +317,8 @@ public class CanalRecordParser extends RecordParser {
             Map<String, String> mySqlFieldTypes) {
         LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
         mySqlFieldTypes.forEach(
-                (name, type) -> paimonFieldTypes.put(name, 
MySqlTypeUtils.toDataType(type)));
+                (name, type) ->
+                        paimonFieldTypes.put(name, 
MySqlTypeUtils.toDataType(type, typeMapping)));
         return paimonFieldTypes;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
index 47a2a8cb6..8d735b504 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -58,14 +59,13 @@ public class OggRecordParser extends RecordParser {
     private static final String OP_UPDATE = "U";
     private static final String OP_INSERT = "I";
     private static final String OP_DELETE = "D";
-    private final List<ComputedColumn> computedColumns;
 
     public OggRecordParser(
             boolean caseSensitive,
+            TypeMapping typeMapping,
             TableNameConverter tableNameConverter,
             List<ComputedColumn> computedColumns) {
-        super(tableNameConverter, caseSensitive);
-        this.computedColumns = computedColumns;
+        super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 88859a38d..9124a6b5d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -90,8 +90,8 @@ public class MySqlSyncTableAction extends ActionBase {
     private final String table;
     private final Configuration mySqlConfig;
 
-    private final List<String> partitionKeys = new ArrayList<>();
-    private final List<String> primaryKeys = new ArrayList<>();
+    private List<String> partitionKeys = new ArrayList<>();
+    private List<String> primaryKeys = new ArrayList<>();
 
     private Map<String, String> tableConfig = new HashMap<>();
     private List<String> computedColumnArgs = new ArrayList<>();
@@ -114,7 +114,7 @@ public class MySqlSyncTableAction extends ActionBase {
     }
 
     public MySqlSyncTableAction withPartitionKeys(List<String> partitionKeys) {
-        this.partitionKeys.addAll(partitionKeys);
+        this.partitionKeys = partitionKeys;
         return this;
     }
 
@@ -123,7 +123,7 @@ public class MySqlSyncTableAction extends ActionBase {
     }
 
     public MySqlSyncTableAction withPrimaryKeys(List<String> primaryKeys) {
-        this.primaryKeys.addAll(primaryKeys);
+        this.primaryKeys = primaryKeys;
         return this;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index b64fc9cad..e75c4e05d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -138,12 +138,12 @@ public class MySqlTypeUtils {
 
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
-    public static DataType toDataType(String mysqlType) {
+    public static DataType toDataType(String mysqlFullType, TypeMapping 
typeMapping) {
         return toDataType(
-                MySqlTypeUtils.getShortType(mysqlType),
-                MySqlTypeUtils.getPrecision(mysqlType),
-                MySqlTypeUtils.getScale(mysqlType),
-                false);
+                MySqlTypeUtils.getShortType(mysqlFullType),
+                MySqlTypeUtils.getPrecision(mysqlFullType),
+                MySqlTypeUtils.getScale(mysqlFullType),
+                typeMapping);
     }
 
     public static DataType toDataType(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index c304909a2..1dd3d2059 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -257,10 +257,6 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
             super(kafkaConfig);
         }
 
-        public KafkaSyncTableActionBuilder withTypeMappingModes(String... 
typeMappingModes) {
-            throw new UnsupportedOperationException();
-        }
-
         public KafkaSyncTableAction build() {
             List<String> args =
                     new ArrayList<>(
@@ -278,6 +274,7 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
 
             args.addAll(listToArgs("--partition-keys", partitionKeys));
             args.addAll(listToArgs("--primary-keys", primaryKeys));
+            args.addAll(listToArgs("--type-mapping", typeMappingModes));
 
             args.addAll(listToMultiArgs("--computed-column", 
computedColumnArgs));
 
@@ -310,10 +307,6 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
             throw new UnsupportedOperationException();
         }
 
-        public KafkaSyncDatabaseActionBuilder withTypeMappingModes(String... 
typeMappingModes) {
-            throw new UnsupportedOperationException();
-        }
-
         public KafkaSyncDatabaseAction build() {
             List<String> args =
                     new ArrayList<>(
@@ -328,6 +321,8 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
             args.addAll(nullableToArgs("--including-tables", includingTables));
             args.addAll(nullableToArgs("--excluding-tables", excludingTables));
 
+            args.addAll(listToArgs("--type-mapping", typeMappingModes));
+
             MultipleParameterTool params =
                     
MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
             return (KafkaSyncDatabaseAction)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 93d713248..3dd96fecd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -520,6 +521,41 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         assertTableNotExists(notExistedTables);
     }
 
+    @Test
+    @Timeout(60)
+    public void testTypeMappingToString() throws Exception {
+        final String topic = "map-to-string";
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Write the Canal json into Kafka -------------------
+        writeRecordsToKafka(topic, 
readLines("kafka/canal/database/tostring/canal-data-1.txt"));
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "canal-json");
+        kafkaConfig.put("topic", topic);
+
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withTypeMappingModes(TO_STRING.configString())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        waitingTables("t1");
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(), DataTypes.STRING(), 
DataTypes.STRING()
+                        },
+                        new String[] {"k1", "v0", "v1"});
+        waitForResult(
+                Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"),
+                getFileStoreTable("t1"),
+                rowType,
+                Collections.singletonList("k1"));
+    }
+
     @Test
     public void testCatalogAndTableConfig() {
         KafkaSyncDatabaseAction action =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 4747334e6..520917dc1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 import static 
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -911,6 +912,39 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
                 Arrays.asList("_id", "_year"));
     }
 
+    @Test
+    @Timeout(60)
+    public void testTypeMappingToString() throws Exception {
+        final String topic = "map-to-string";
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Write the Canal json into Kafka -------------------
+        writeRecordsToKafka(topic, 
readLines("kafka/canal/table/tostring/canal-data-1.txt"));
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "canal-json");
+        kafkaConfig.put("topic", topic);
+
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withTypeMappingModes(TO_STRING.configString())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(), DataTypes.STRING(), 
DataTypes.STRING()
+                        },
+                        new String[] {"k1", "v0", "v1"});
+        waitForResult(
+                Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Collections.singletonList("k1"));
+    }
+
     @Test
     public void testCatalogAndTableConfig() {
         KafkaSyncTableAction action =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
index a72849427..6f389c220 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 
@@ -50,7 +51,8 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase {
         kafkaConfig.put("topic", topic);
 
         KafkaSchema kafkaSchema =
-                KafkaSchema.getKafkaSchema(Configuration.fromMap(kafkaConfig), 
topic);
+                KafkaSchema.getKafkaSchema(
+                        Configuration.fromMap(kafkaConfig), topic, 
TypeMapping.defaultMapping());
         Map<String, DataType> fields = new LinkedHashMap<>();
         fields.put("pt", DataTypes.INT());
         fields.put("_id", DataTypes.INT());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/database/tostring/canal-data-1.txt
 
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/database/tostring/canal-data-1.txt
new file mode 100644
index 000000000..b2618acf0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/database/tostring/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k1":"5","v0":"five","v1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"}
+{"data":[{"k1":"7","v0":"seven","v1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/table/tostring/canal-data-1.txt
 
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/table/tostring/canal-data-1.txt
new file mode 100644
index 000000000..b2618acf0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/table/tostring/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k1":"5","v0":"five","v1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"}
+{"data":[{"k1":"7","v0":"seven","v1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"}

Reply via email to