This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b5180f3b2 [flink][kafka-cdc] Kafka Canal CDC supports data type to
string mapping (#1883)
b5180f3b2 is described below
commit b5180f3b2eb5da219b20a4fc993ad4793d20c127
Author: yuzelin <[email protected]>
AuthorDate: Tue Aug 29 16:08:51 2023 +0800
[flink][kafka-cdc] Kafka Canal CDC supports data type to string mapping
(#1883)
---
docs/content/how-to/cdc-ingestion.md | 2 +
.../shortcodes/generated/kafka_sync_database.html | 4 ++
.../shortcodes/generated/kafka_sync_table.html | 4 ++
.../paimon/flink/action/cdc/kafka/KafkaSchema.java | 8 ++-
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 72 ++++++++++++++++------
.../cdc/kafka/KafkaSyncDatabaseActionFactory.java | 47 +++++++-------
.../action/cdc/kafka/KafkaSyncTableAction.java | 61 +++++++++++++-----
.../cdc/kafka/KafkaSyncTableActionFactory.java | 57 ++++++++---------
.../flink/action/cdc/kafka/formats/DataFormat.java | 4 +-
.../action/cdc/kafka/formats/RecordParser.java | 18 +++++-
.../cdc/kafka/formats/RecordParserFactory.java | 3 +
.../cdc/kafka/formats/canal/CanalRecordParser.java | 13 ++--
.../cdc/kafka/formats/ogg/OggRecordParser.java | 6 +-
.../action/cdc/mysql/MySqlSyncTableAction.java | 8 +--
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 10 +--
.../action/cdc/kafka/KafkaActionITCaseBase.java | 11 +---
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 36 +++++++++++
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 34 ++++++++++
.../flink/action/cdc/kafka/KafkaSchemaITCase.java | 4 +-
.../kafka/canal/database/tostring/canal-data-1.txt | 20 ++++++
.../kafka/canal/table/tostring/canal-data-1.txt | 20 ++++++
21 files changed, 321 insertions(+), 121 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index 0e8d78457..188a331d9 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -322,6 +322,7 @@ To use this feature through `flink run`, run the following
shell command.
--table <table-name> \
[--partition-keys <partition-keys>] \
[--primary-keys <primary-keys>] \
+ [--type-mapping to-string] \
[--computed-column <'column-name=expr-name(args[, ...])'>
[--computed-column ...]] \
[--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
@@ -370,6 +371,7 @@ To use this feature through `flink run`, run the following
shell command.
[--table-suffix <paimon-table-suffix>] \
[--including-tables <table-name|name-regular-expr>] \
[--excluding-tables <table-name|name-regular-expr>] \
+ [--type-mapping to-string] \
[--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf
<paimon-table-sink-conf> ...]]
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index ac86c6449..e5d5671bd 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -57,6 +57,10 @@ under the License.
<td><h5>--excluding-tables</h5></td>
<td>It is used to specify which source tables are not to be
synchronized. The usage is same as "--including-tables". "--excluding-tables"
has higher priority than "--including-tables" if you specified both.</td>
</tr>
+ <tr>
+ <td><h5>--type-mapping</h5></td>
+ <td>It is used to specify how to map MySQL data type to Paimon type.
Currently, only support option "to-string": maps all MySQL types to STRING.</td>
+ </tr>
<tr>
<td><h5>--kafka-conf</h5></td>
<td>The configuration for Flink Kafka sources. Each configuration
should be specified in the format `key=value`. `properties.bootstrap.servers`,
`topic`, `properties.group.id`, and `value.format` are required
configurations, others are optional.See its <a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a>
for a complete list of configurations.</td>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html
b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index 339732e38..a4e3e42df 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -45,6 +45,10 @@ under the License.
<td><h5>--primary-keys</h5></td>
<td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".</td>
</tr>
+ <tr>
+ <td><h5>--type-mapping</h5></td>
+ <td>It is used to specify how to map MySQL data type to Paimon type.
Currently, only support option "to-string": maps all MySQL types to STRING.</td>
+ </tr>
<tr>
<td><h5>--computed-column</h5></td>
<td>The definitions of computed columns. The argument field is from
Kafka topic's table field name. See <a href="#computed-functions">here</a> for
a complete list of configurations. </td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 2e1a2f98e..0a5931955 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.types.DataType;
@@ -119,10 +120,12 @@ public class KafkaSchema {
*
* @param kafkaConfig The configuration for Kafka.
* @param topic The topic to retrieve the schema for.
+ * @param typeMapping data type mapping options.
* @return The Kafka schema for the topic.
* @throws KafkaSchemaRetrievalException If unable to retrieve the schema
after max retries.
*/
- public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String
topic)
+ public static KafkaSchema getKafkaSchema(
+ Configuration kafkaConfig, String topic, TypeMapping typeMapping)
throws KafkaSchemaRetrievalException {
KafkaConsumer<String, String> consumer =
getKafkaEarliestConsumer(kafkaConfig, topic);
int retry = 0;
@@ -130,7 +133,8 @@ public class KafkaSchema {
DataFormat format = getDataFormat(kafkaConfig);
RecordParser recordParser =
- format.createParser(true, new TableNameConverter(true),
Collections.emptyList());
+ format.createParser(
+ true, new TableNameConverter(true), typeMapping,
Collections.emptyList());
while (true) {
ConsumerRecords<String, String> consumerRecords =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 3c271a3ae..fcd879a77 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -40,6 +41,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
@@ -82,32 +84,60 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
public class KafkaSyncDatabaseAction extends ActionBase {
- private final Configuration kafkaConfig;
private final String database;
- private final String tablePrefix;
- private final String tableSuffix;
- @Nullable private final Pattern includingPattern;
- @Nullable private final Pattern excludingPattern;
- private final Map<String, String> tableConfig;
+ private final Configuration kafkaConfig;
+
+ private Map<String, String> tableConfig = new HashMap<>();
+ private String tablePrefix = "";
+ private String tableSuffix = "";
+ private String includingTables = ".*";
+ @Nullable String excludingTables;
+ private TypeMapping typeMapping = TypeMapping.defaultMapping();
public KafkaSyncDatabaseAction(
- Map<String, String> kafkaConfig,
String warehouse,
String database,
- @Nullable String tablePrefix,
- @Nullable String tableSuffix,
- @Nullable String includingTables,
- @Nullable String excludingTables,
Map<String, String> catalogConfig,
- Map<String, String> tableConfig) {
+ Map<String, String> kafkaConfig) {
super(warehouse, catalogConfig);
- this.kafkaConfig = Configuration.fromMap(kafkaConfig);
this.database = database;
- this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
- this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
- this.includingPattern = includingTables == null ? null :
Pattern.compile(includingTables);
- this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
+ this.kafkaConfig = Configuration.fromMap(kafkaConfig);
+ }
+
+ public KafkaSyncDatabaseAction withTableConfig(Map<String, String>
tableConfig) {
this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public KafkaSyncDatabaseAction withTablePrefix(@Nullable String
tablePrefix) {
+ if (tablePrefix != null) {
+ this.tablePrefix = tablePrefix;
+ }
+ return this;
+ }
+
+ public KafkaSyncDatabaseAction withTableSuffix(@Nullable String
tableSuffix) {
+ if (tableSuffix != null) {
+ this.tableSuffix = tableSuffix;
+ }
+ return this;
+ }
+
+ public KafkaSyncDatabaseAction includingTables(@Nullable String
includingTables) {
+ if (includingTables != null) {
+ this.includingTables = includingTables;
+ }
+ return this;
+ }
+
+ public KafkaSyncDatabaseAction excludingTables(@Nullable String
excludingTables) {
+ this.excludingTables = excludingTables;
+ return this;
+ }
+
+ public KafkaSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) {
+ this.typeMapping = typeMapping;
+ return this;
}
public void build(StreamExecutionEnvironment env) throws Exception {
@@ -125,11 +155,13 @@ public class KafkaSyncDatabaseAction extends ActionBase {
DataFormat format = DataFormat.getDataFormat(kafkaConfig);
RecordParser recordParser =
- format.createParser(caseSensitive, tableNameConverter,
Collections.emptyList());
+ format.createParser(
+ caseSensitive, tableNameConverter, typeMapping,
Collections.emptyList());
RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
- Pattern includingPattern = this.includingPattern;
- Pattern excludingPattern = this.excludingPattern;
+ Pattern includingPattern = Pattern.compile(includingTables);
+ Pattern excludingPattern =
+ excludingTables == null ? null :
Pattern.compile(excludingTables);
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
() ->
new RichCdcMultiplexRecordEventParser(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index 354b5405f..9a33e6f91 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -20,10 +20,10 @@ package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.java.utils.MultipleParameterTool;
-import java.util.Map;
import java.util.Optional;
/** Factory to create {@link KafkaSyncDatabaseAction}. */
@@ -38,31 +38,27 @@ public class KafkaSyncDatabaseActionFactory implements
ActionFactory {
@Override
public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "warehouse");
- checkRequiredArgument(params, "database");
checkRequiredArgument(params, "kafka-conf");
- String warehouse = params.get("warehouse");
- String database = params.get("database");
- String tablePrefix = params.get("table-prefix");
- String tableSuffix = params.get("table-suffix");
- String includingTables = params.get("including-tables");
- String excludingTables = params.get("excluding-tables");
-
- Map<String, String> kafkaConfigOption = optionalConfigMap(params,
"kafka-conf");
- Map<String, String> catalogConfigOption = optionalConfigMap(params,
"catalog-conf");
- Map<String, String> tableConfigOption = optionalConfigMap(params,
"table-conf");
- return Optional.of(
+ KafkaSyncDatabaseAction action =
new KafkaSyncDatabaseAction(
- kafkaConfigOption,
- warehouse,
- database,
- tablePrefix,
- tableSuffix,
- includingTables,
- excludingTables,
- catalogConfigOption,
- tableConfigOption));
+ getRequiredValue(params, "warehouse"),
+ getRequiredValue(params, "database"),
+ optionalConfigMap(params, "catalog-conf"),
+ optionalConfigMap(params, "kafka-conf"));
+
+ action.withTableConfig(optionalConfigMap(params, "table-conf"))
+ .withTablePrefix(params.get("table-prefix"))
+ .withTableSuffix(params.get("table-suffix"))
+ .includingTables(params.get("including-tables"))
+ .excludingTables(params.get("excluding-tables"));
+
+ if (params.has("type-mapping")) {
+ String[] options = params.get("type-mapping").split(",");
+ action.withTypeMapping(TypeMapping.parse(options));
+ }
+
+ return Optional.of(action);
}
@Override
@@ -81,6 +77,7 @@ public class KafkaSyncDatabaseActionFactory implements
ActionFactory {
+ "[--table-suffix <paimon-table-suffix>] "
+ "[--including-tables <table-name|name-regular-expr>]
"
+ "[--excluding-tables <table-name|name-regular-expr>]
"
+ + "[--type-mapping <option1,option2...>] "
+ "[--kafka-conf <kafka-source-conf> [--kafka-conf
<kafka-source-conf> ...]] "
+ "[--catalog-conf <paimon-catalog-conf>
[--catalog-conf <paimon-catalog-conf> ...]] "
+ "[--table-conf <paimon-table-sink-conf>
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -102,6 +99,10 @@ public class KafkaSyncDatabaseActionFactory implements
ActionFactory {
"--excluding-tables has higher priority than
--including-tables if you specified both.");
System.out.println();
+ System.out.println(
+ "--type-mapping is used to specify how to map MySQL type to
Paimon type. Please see the doc for usage.");
+ System.out.println();
+
System.out.println("kafka source conf syntax:");
System.out.println(" key=value");
System.out.println(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index f55b3f670..2dffcfcab 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
@@ -41,6 +42,9 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -80,40 +84,66 @@ import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComput
*/
public class KafkaSyncTableAction extends ActionBase {
- private final Configuration kafkaConfig;
private final String database;
private final String table;
- private final List<String> partitionKeys;
- private final List<String> primaryKeys;
+ private final Configuration kafkaConfig;
- private final List<String> computedColumnArgs;
+ private List<String> partitionKeys = new ArrayList<>();
+ private List<String> primaryKeys = new ArrayList<>();
- private final Map<String, String> tableConfig;
+ private Map<String, String> tableConfig = new HashMap<>();
+ private List<String> computedColumnArgs = new ArrayList<>();
+ private TypeMapping typeMapping = TypeMapping.defaultMapping();
public KafkaSyncTableAction(
- Map<String, String> kafkaConfig,
String warehouse,
String database,
String table,
- List<String> partitionKeys,
- List<String> primaryKeys,
- List<String> computedColumnArgs,
Map<String, String> catalogConfig,
- Map<String, String> tableConfig) {
+ Map<String, String> kafkaConfig) {
super(warehouse, catalogConfig);
- this.kafkaConfig = Configuration.fromMap(kafkaConfig);
this.database = database;
this.table = table;
+ this.kafkaConfig = Configuration.fromMap(kafkaConfig);
+ }
+
+ public KafkaSyncTableAction withPartitionKeys(String... partitionKeys) {
+ return withPartitionKeys(Arrays.asList(partitionKeys));
+ }
+
+ public KafkaSyncTableAction withPartitionKeys(List<String> partitionKeys) {
this.partitionKeys = partitionKeys;
+ return this;
+ }
+
+ public KafkaSyncTableAction withPrimaryKeys(String... primaryKeys) {
+ return withPrimaryKeys(Arrays.asList(primaryKeys));
+ }
+
+ public KafkaSyncTableAction withPrimaryKeys(List<String> primaryKeys) {
this.primaryKeys = primaryKeys;
- this.computedColumnArgs = computedColumnArgs;
+ return this;
+ }
+
+ public KafkaSyncTableAction withTableConfig(Map<String, String>
tableConfig) {
this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public KafkaSyncTableAction withComputedColumnArgs(List<String>
computedColumnArgs) {
+ this.computedColumnArgs = computedColumnArgs;
+ return this;
+ }
+
+ public KafkaSyncTableAction withTypeMapping(TypeMapping typeMapping) {
+ this.typeMapping = typeMapping;
+ return this;
}
public void build(StreamExecutionEnvironment env) throws Exception {
KafkaSource<String> source =
KafkaActionUtils.buildKafkaSource(kafkaConfig);
String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0);
- KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig,
topic);
+ KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig,
topic, typeMapping);
catalog.createDatabase(database, true);
boolean caseSensitive = catalog.caseSensitive();
@@ -140,7 +170,10 @@ public class KafkaSyncTableAction extends ActionBase {
DataFormat format = DataFormat.getDataFormat(kafkaConfig);
RecordParser recordParser =
format.createParser(
- caseSensitive, new TableNameConverter(caseSensitive),
computedColumns);
+ caseSensitive,
+ new TableNameConverter(caseSensitive),
+ typeMapping,
+ computedColumns);
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
RichCdcMultiplexRecordEventParser::new;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
index ad748713e..3af27df4c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
@@ -20,17 +20,13 @@ package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/** Factory to create {@link KafkaSyncTableAction}. */
public class KafkaSyncTableActionFactory implements ActionFactory {
@@ -45,42 +41,36 @@ public class KafkaSyncTableActionFactory implements
ActionFactory {
@Override
public Optional<Action> create(MultipleParameterTool params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
+ checkRequiredArgument(params, "kafka-conf");
+
+ KafkaSyncTableAction action =
+ new KafkaSyncTableAction(
+ tablePath.f0,
+ tablePath.f1,
+ tablePath.f2,
+ optionalConfigMap(params, "catalog-conf"),
+ optionalConfigMap(params, "kafka-conf"))
+ .withTableConfig(optionalConfigMap(params,
"table-conf"));
- List<String> partitionKeys = Collections.emptyList();
if (params.has("partition-keys")) {
- partitionKeys =
- Arrays.stream(params.get("partition-keys").split(","))
- .collect(Collectors.toList());
+ action.withPartitionKeys(params.get("partition-keys").split(","));
}
- List<String> primaryKeys = Collections.emptyList();
if (params.has("primary-keys")) {
- primaryKeys =
- Arrays.stream(params.get("primary-keys").split(","))
- .collect(Collectors.toList());
+ action.withPrimaryKeys(params.get("primary-keys").split(","));
}
- List<String> computedColumnArgs = Collections.emptyList();
+
if (params.has("computed-column")) {
- computedColumnArgs = new
ArrayList<>(params.getMultiParameter("computed-column"));
+ action.withComputedColumnArgs(
+ new
ArrayList<>(params.getMultiParameter("computed-column")));
}
- checkRequiredArgument(params, "kafka-conf");
-
- Map<String, String> kafkaConfig = optionalConfigMap(params,
"kafka-conf");
- Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
- Map<String, String> paimonConfig = optionalConfigMap(params,
"table-conf");
+ if (params.has("type-mapping")) {
+ String[] options = params.get("type-mapping").split(",");
+ action.withTypeMapping(TypeMapping.parse(options));
+ }
- return Optional.of(
- new KafkaSyncTableAction(
- kafkaConfig,
- tablePath.f0,
- tablePath.f1,
- tablePath.f2,
- partitionKeys,
- primaryKeys,
- computedColumnArgs,
- catalogConfig,
- paimonConfig));
+ return Optional.of(action);
}
@Override
@@ -96,6 +86,7 @@ public class KafkaSyncTableActionFactory implements
ActionFactory {
+ "--table <table-name> "
+ "[--partition-keys <partition-keys>] "
+ "[--primary-keys <primary-keys>] "
+ + "[--type-mapping <option1,option2...>] "
+ "[--computed-column <'column-name=expr-name(args[,
...])'> [--computed-column ...]] "
+ "[--kafka-conf <kafka-source-conf> [--kafka-conf
<kafka-source-conf> ...]] "
+ "[--catalog-conf <paimon-catalog-conf>
[--catalog-conf <paimon-catalog-conf> ...]] "
@@ -114,6 +105,10 @@ public class KafkaSyncTableActionFactory implements
ActionFactory {
System.out.println("Primary keys will be derived from tables if not
specified.");
System.out.println();
+ System.out.println(
+ "--type-mapping is used to specify how to map MySQL type to
Paimon type. Please see the doc for usage.");
+ System.out.println();
+
System.out.println("Please see doc for usage of --computed-column.");
System.out.println();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
index 4a95b88e9..155532900 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.formats;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;
@@ -58,8 +59,9 @@ public enum DataFormat {
public RecordParser createParser(
boolean caseSensitive,
TableNameConverter tableNameConverter,
+ TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- return parser.createParser(caseSensitive, tableNameConverter,
computedColumns);
+ return parser.createParser(caseSensitive, typeMapping,
tableNameConverter, computedColumns);
}
/**
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index 23de278b3..8527d93b6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -18,7 +18,9 @@
package org.apache.paimon.flink.action.cdc.kafka.formats;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.utils.StringUtils;
@@ -50,11 +52,15 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
protected final TableNameConverter tableNameConverter;
+ protected final boolean caseSensitive;
+ protected final TypeMapping typeMapping;
+ protected final List<ComputedColumn> computedColumns;
+
protected JsonNode root;
protected String databaseName;
protected String tableName;
- protected boolean caseSensitive;
protected abstract List<RichCdcMultiplexRecord> extractRecords();
@@ -64,9 +70,15 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
public abstract KafkaSchema getKafkaSchema(String record);
- public RecordParser(TableNameConverter tableNameConverter, boolean
caseSensitive) {
- this.tableNameConverter = tableNameConverter;
+ public RecordParser(
+ boolean caseSensitive,
+ TypeMapping typeMapping,
+ TableNameConverter tableNameConverter,
+ List<ComputedColumn> computedColumns) {
this.caseSensitive = caseSensitive;
+ this.typeMapping = typeMapping;
+ this.tableNameConverter = tableNameConverter;
+ this.computedColumns = computedColumns;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
index ee4efe297..413ecbc4f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.formats;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import java.util.List;
@@ -38,12 +39,14 @@ public interface RecordParserFactory {
* Creates a new instance of {@link RecordParser} with the specified
configurations.
*
* @param caseSensitive Indicates whether the parser should be
case-sensitive.
+ * @param typeMapping Data type mapping options.
* @param tableNameConverter Converter to transform table names.
* @param computedColumns List of computed columns to be considered by the
parser.
* @return A new instance of {@link RecordParser}.
*/
RecordParser createParser(
boolean caseSensitive,
+ TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 17321214e..5f3b82c40 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -20,6 +20,7 @@ package
org.apache.paimon.flink.action.cdc.kafka.formats.canal;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
@@ -71,14 +72,12 @@ public class CanalRecordParser extends RecordParser {
private static final String OP_INSERT = "INSERT";
private static final String OP_DELETE = "DELETE";
- private final List<ComputedColumn> computedColumns;
-
public CanalRecordParser(
boolean caseSensitive,
+ TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns) {
- super(tableNameConverter, caseSensitive);
- this.computedColumns = computedColumns;
+ super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
}
@Override
@@ -123,7 +122,8 @@ public class CanalRecordParser extends RecordParser {
LinkedHashMap<String, String> mySqlFieldTypes =
extractFieldTypesFromMySqlType();
LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
mySqlFieldTypes.forEach(
- (name, type) -> paimonFieldTypes.put(name,
MySqlTypeUtils.toDataType(type)));
+ (name, type) ->
+ paimonFieldTypes.put(name,
MySqlTypeUtils.toDataType(type, typeMapping)));
return new KafkaSchema(
extractString(FIELD_DATABASE),
@@ -317,7 +317,8 @@ public class CanalRecordParser extends RecordParser {
Map<String, String> mySqlFieldTypes) {
LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
mySqlFieldTypes.forEach(
- (name, type) -> paimonFieldTypes.put(name,
MySqlTypeUtils.toDataType(type)));
+ (name, type) ->
+ paimonFieldTypes.put(name,
MySqlTypeUtils.toDataType(type, typeMapping)));
return paimonFieldTypes;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
index 47a2a8cb6..8d735b504 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -58,14 +59,13 @@ public class OggRecordParser extends RecordParser {
private static final String OP_UPDATE = "U";
private static final String OP_INSERT = "I";
private static final String OP_DELETE = "D";
- private final List<ComputedColumn> computedColumns;
public OggRecordParser(
boolean caseSensitive,
+ TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns) {
- super(tableNameConverter, caseSensitive);
- this.computedColumns = computedColumns;
+ super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 88859a38d..9124a6b5d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -90,8 +90,8 @@ public class MySqlSyncTableAction extends ActionBase {
private final String table;
private final Configuration mySqlConfig;
- private final List<String> partitionKeys = new ArrayList<>();
- private final List<String> primaryKeys = new ArrayList<>();
+ private List<String> partitionKeys = new ArrayList<>();
+ private List<String> primaryKeys = new ArrayList<>();
private Map<String, String> tableConfig = new HashMap<>();
private List<String> computedColumnArgs = new ArrayList<>();
@@ -114,7 +114,7 @@ public class MySqlSyncTableAction extends ActionBase {
}
public MySqlSyncTableAction withPartitionKeys(List<String> partitionKeys) {
- this.partitionKeys.addAll(partitionKeys);
+ this.partitionKeys = partitionKeys;
return this;
}
@@ -123,7 +123,7 @@ public class MySqlSyncTableAction extends ActionBase {
}
public MySqlSyncTableAction withPrimaryKeys(List<String> primaryKeys) {
- this.primaryKeys.addAll(primaryKeys);
+ this.primaryKeys = primaryKeys;
return this;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index b64fc9cad..e75c4e05d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -138,12 +138,12 @@ public class MySqlTypeUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
- public static DataType toDataType(String mysqlType) {
+ public static DataType toDataType(String mysqlFullType, TypeMapping
typeMapping) {
return toDataType(
- MySqlTypeUtils.getShortType(mysqlType),
- MySqlTypeUtils.getPrecision(mysqlType),
- MySqlTypeUtils.getScale(mysqlType),
- false);
+ MySqlTypeUtils.getShortType(mysqlFullType),
+ MySqlTypeUtils.getPrecision(mysqlFullType),
+ MySqlTypeUtils.getScale(mysqlFullType),
+ typeMapping);
}
public static DataType toDataType(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index c304909a2..1dd3d2059 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -257,10 +257,6 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
super(kafkaConfig);
}
- public KafkaSyncTableActionBuilder withTypeMappingModes(String...
typeMappingModes) {
- throw new UnsupportedOperationException();
- }
-
public KafkaSyncTableAction build() {
List<String> args =
new ArrayList<>(
@@ -278,6 +274,7 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
args.addAll(listToArgs("--partition-keys", partitionKeys));
args.addAll(listToArgs("--primary-keys", primaryKeys));
+ args.addAll(listToArgs("--type-mapping", typeMappingModes));
args.addAll(listToMultiArgs("--computed-column",
computedColumnArgs));
@@ -310,10 +307,6 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
throw new UnsupportedOperationException();
}
- public KafkaSyncDatabaseActionBuilder withTypeMappingModes(String...
typeMappingModes) {
- throw new UnsupportedOperationException();
- }
-
public KafkaSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
@@ -328,6 +321,8 @@ public abstract class KafkaActionITCaseBase extends
CdcActionITCaseBase {
args.addAll(nullableToArgs("--including-tables", includingTables));
args.addAll(nullableToArgs("--excluding-tables", excludingTables));
+ args.addAll(listToArgs("--type-mapping", typeMappingModes));
+
MultipleParameterTool params =
MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
return (KafkaSyncDatabaseAction)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 93d713248..3dd96fecd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -520,6 +521,41 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
assertTableNotExists(notExistedTables);
}
+ @Test
+ @Timeout(60)
+ public void testTypeMappingToString() throws Exception {
+ final String topic = "map-to-string";
+ createTestTopic(topic, 1, 1);
+
+ // ---------- Write the Canal json into Kafka -------------------
+ writeRecordsToKafka(topic,
readLines("kafka/canal/database/tostring/canal-data-1.txt"));
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "canal-json");
+ kafkaConfig.put("topic", topic);
+
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withTypeMappingModes(TO_STRING.configString())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ waitingTables("t1");
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(), DataTypes.STRING(),
DataTypes.STRING()
+ },
+ new String[] {"k1", "v0", "v1"});
+ waitForResult(
+ Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"),
+ getFileStoreTable("t1"),
+ rowType,
+ Collections.singletonList("k1"));
+ }
+
@Test
public void testCatalogAndTableConfig() {
KafkaSyncDatabaseAction action =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 4747334e6..520917dc1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -911,6 +912,39 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaActionITCaseBase {
Arrays.asList("_id", "_year"));
}
+ @Test
+ @Timeout(60)
+ public void testTypeMappingToString() throws Exception {
+ final String topic = "map-to-string";
+ createTestTopic(topic, 1, 1);
+
+ // ---------- Write the Canal json into Kafka -------------------
+ writeRecordsToKafka(topic,
readLines("kafka/canal/table/tostring/canal-data-1.txt"));
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "canal-json");
+ kafkaConfig.put("topic", topic);
+
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withTypeMappingModes(TO_STRING.configString())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(), DataTypes.STRING(),
DataTypes.STRING()
+ },
+ new String[] {"k1", "v0", "v1"});
+ waitForResult(
+ Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.singletonList("k1"));
+ }
+
@Test
public void testCatalogAndTableConfig() {
KafkaSyncTableAction action =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
index a72849427..6f389c220 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -50,7 +51,8 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase {
kafkaConfig.put("topic", topic);
KafkaSchema kafkaSchema =
- KafkaSchema.getKafkaSchema(Configuration.fromMap(kafkaConfig),
topic);
+ KafkaSchema.getKafkaSchema(
+ Configuration.fromMap(kafkaConfig), topic,
TypeMapping.defaultMapping());
Map<String, DataType> fields = new LinkedHashMap<>();
fields.put("pt", DataTypes.INT());
fields.put("_id", DataTypes.INT());
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/database/tostring/canal-data-1.txt
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/database/tostring/canal-data-1.txt
new file mode 100644
index 000000000..b2618acf0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/database/tostring/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k1":"5","v0":"five","v1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"}
+{"data":[{"k1":"7","v0":"seven","v1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/table/tostring/canal-data-1.txt
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/table/tostring/canal-data-1.txt
new file mode 100644
index 000000000..b2618acf0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/kafka/canal/table/tostring/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k1":"5","v0":"five","v1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"}
+{"data":[{"k1":"7","v0":"seven","v1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"}