This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fdfa6d99d [flink][cdc] Extract common util methods (#1926)
fdfa6d99d is described below

commit fdfa6d99d7d4ec3d9eb948589da338f0e16d8050
Author: yuzelin <[email protected]>
AuthorDate: Thu Aug 31 18:55:38 2023 +0800

    [flink][cdc] Extract common util methods (#1926)
---
 .../flink/action/cdc/CdcActionCommonUtils.java     | 253 +++++++++++++++++++++
 .../flink/action/cdc/kafka/KafkaActionUtils.java   | 109 ++-------
 .../paimon/flink/action/cdc/kafka/KafkaSchema.java |   8 +-
 .../action/cdc/kafka/KafkaSyncTableAction.java     |   3 +-
 .../action/cdc/kafka/formats/RecordParser.java     |  21 --
 .../cdc/kafka/formats/canal/CanalRecordParser.java |   8 +-
 .../cdc/kafka/formats/ogg/OggRecordParser.java     |   4 +-
 .../action/cdc/mongodb/MongoDBActionUtils.java     |  50 ++--
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     |  50 +---
 .../flink/action/cdc/mongodb/MongodbSchema.java    |  20 +-
 .../cdc/mongodb/strategy/MongoVersionStrategy.java |  27 +--
 .../flink/action/cdc/mysql/MySqlActionUtils.java   | 121 ++--------
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |  20 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  67 +-----
 .../action/cdc/mysql/MySqlSyncTableAction.java     |   3 +-
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  |  23 +-
 .../flink/action/cdc/mysql/schema/MySqlSchema.java |  11 +-
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |   4 +-
 .../cdc/kafka/KafkaOggSyncTableActionITCase.java   |   4 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  10 +-
 20 files changed, 391 insertions(+), 425 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
new file mode 100644
index 000000000..4d0af4d37
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
+import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Common utils for CDC Action. */
+public class CdcActionCommonUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CdcActionCommonUtils.class);
+
+    public static void assertSchemaCompatible(
+            TableSchema paimonSchema, List<DataField> sourceTableFields) {
+        if (!schemaCompatible(paimonSchema, sourceTableFields)) {
+            throw new IllegalArgumentException(
+                    "Paimon schema and source table schema are not 
compatible.\n"
+                            + "Paimon fields are: "
+                            + paimonSchema.fields()
+                            + ".\nSource table fields are: "
+                            + sourceTableFields);
+        }
+    }
+
+    public static boolean schemaCompatible(
+            TableSchema paimonSchema, List<DataField> sourceTableFields) {
+        for (DataField field : sourceTableFields) {
+            int idx = paimonSchema.fieldNames().indexOf(field.name());
+            if (idx < 0) {
+                LOG.info("Cannot find field '{}' in Paimon table.", 
field.name());
+                return false;
+            }
+            DataType type = paimonSchema.fields().get(idx).type();
+            if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
+                    != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) 
{
+                LOG.info(
+                        "Cannot convert field '{}' from source table type '{}' 
to Paimon type '{}'.",
+                        field.name(),
+                        field.type(),
+                        type);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static <T> LinkedHashMap<String, T> mapKeyCaseConvert(
+            LinkedHashMap<String, T> origin,
+            boolean caseSensitive,
+            Function<String, String> duplicateErrMsg) {
+        return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg, 
LinkedHashMap::new);
+    }
+
+    public static <T> Map<String, T> mapKeyCaseConvert(
+            Map<String, T> origin,
+            boolean caseSensitive,
+            Function<String, String> duplicateErrMsg) {
+        return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg, 
HashMap::new);
+    }
+
+    private static <T, M extends Map<String, T>> M mapKeyCaseConvert(
+            M origin,
+            boolean caseSensitive,
+            Function<String, String> duplicateErrMsg,
+            Supplier<M> mapSupplier) {
+        if (caseSensitive) {
+            return origin;
+        } else {
+            M newMap = mapSupplier.get();
+            for (Map.Entry<String, T> entry : origin.entrySet()) {
+                String key = entry.getKey();
+                checkArgument(!newMap.containsKey(key.toLowerCase()), 
duplicateErrMsg.apply(key));
+                newMap.put(key.toLowerCase(), entry.getValue());
+            }
+            return newMap;
+        }
+    }
+
+    public static Function<String, String> columnDuplicateErrMsg(String 
tableName) {
+        return column ->
+                String.format(
+                        "Failed to convert columns of table '%s' to 
case-insensitive form because duplicate column found: '%s'.",
+                        tableName, column);
+    }
+
+    public static Function<String, String> 
recordKeyDuplicateErrMsg(Map<String, String> record) {
+        return column ->
+                "Failed to convert record map to case-insensitive form because 
duplicate column found. Original record map is:\n"
+                        + record;
+    }
+
+    public static List<String> listCaseConvert(List<String> origin, boolean 
caseSensitive) {
+        return caseSensitive
+                ? origin
+                : 
origin.stream().map(String::toLowerCase).collect(Collectors.toList());
+    }
+
+    public static Schema buildPaimonSchema(
+            List<String> specifiedPartitionKeys,
+            List<String> specifiedPrimaryKeys,
+            List<ComputedColumn> computedColumns,
+            Map<String, String> tableConfig,
+            LinkedHashMap<String, DataType> sourceColumns,
+            @Nullable List<String> sourceColumnComments,
+            List<String> sourcePrimaryKeys) {
+        Schema.Builder builder = Schema.newBuilder();
+
+        // options
+        builder.options(tableConfig);
+
+        // columns
+        if (sourceColumnComments != null) {
+            checkArgument(
+                    sourceColumns.size() == sourceColumnComments.size(),
+                    "Source table columns count and column comments count 
should be equal.");
+
+            int i = 0;
+            for (Map.Entry<String, DataType> entry : sourceColumns.entrySet()) 
{
+                builder.column(entry.getKey(), entry.getValue(), 
sourceColumnComments.get(i++));
+            }
+        } else {
+            sourceColumns.forEach(builder::column);
+        }
+
+        for (ComputedColumn computedColumn : computedColumns) {
+            builder.column(computedColumn.columnName(), 
computedColumn.columnType());
+        }
+
+        // primary keys
+        if (!specifiedPrimaryKeys.isEmpty()) {
+            for (String key : specifiedPrimaryKeys) {
+                if (!sourceColumns.containsKey(key)
+                        && computedColumns.stream().noneMatch(c -> 
c.columnName().equals(key))) {
+                    throw new IllegalArgumentException(
+                            "Specified primary key '"
+                                    + key
+                                    + "' does not exist in source tables or 
computed columns.");
+                }
+            }
+            builder.primaryKey(specifiedPrimaryKeys);
+        } else if (!sourcePrimaryKeys.isEmpty()) {
+            builder.primaryKey(sourcePrimaryKeys);
+        } else {
+            throw new IllegalArgumentException(
+                    "Primary keys are not specified. "
+                            + "Also, can't infer primary keys from source 
table schemas because "
+                            + "source tables have no primary keys or have 
different primary keys.");
+        }
+
+        // partition keys
+        if (!specifiedPartitionKeys.isEmpty()) {
+            builder.partitionKeys(specifiedPartitionKeys);
+        }
+
+        return builder.build();
+    }
+
+    public static String tableList(
+            MultiTablesSinkMode mode,
+            String databasePattern,
+            String includingTablePattern,
+            List<Identifier> monitoredTables,
+            List<Identifier> excludedTables) {
+        if (mode == DIVIDED) {
+            return dividedModeTableList(monitoredTables);
+        } else if (mode == COMBINED) {
+            return combinedModeTableList(databasePattern, 
includingTablePattern, excludedTables);
+        }
+
+        throw new UnsupportedOperationException("Unknown MultiTablesSinkMode: 
" + mode);
+    }
+
+    private static String dividedModeTableList(List<Identifier> 
monitoredTables) {
+        // In DIVIDED mode, we only concern about existed tables
+        return monitoredTables.stream()
+                .map(t -> t.getDatabaseName() + "\\." + t.getObjectName())
+                .collect(Collectors.joining("|"));
+    }
+
+    public static String combinedModeTableList(
+            String databasePattern, String includingTablePattern, 
List<Identifier> excludedTables) {
+        // In COMBINED mode, we should consider both existed tables
+        // and possible newly created
+        // tables, so we should use regular expression to monitor all valid 
tables and exclude
+        // certain invalid tables
+
+        // The table list is built by template:
+        // 
(?!(^db\\.tbl$)|(^...$))((databasePattern)\\.(including_pattern1|...))
+
+        // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables 
whose qualified name
+        // is exactly equal to 'db.tbl'
+        // The including pattern (databasePattern)\\.(including_pattern1|...) 
can include tables
+        // whose qualified name matches one of the patterns
+
+        // a table can be monitored only when its name meets the including 
pattern and doesn't
+        // be excluded by excluding pattern at the same time
+        String includingPattern =
+                String.format("(%s)\\.(%s)", databasePattern, 
includingTablePattern);
+
+        if (excludedTables.isEmpty()) {
+            return includingPattern;
+        }
+
+        String excludingPattern =
+                excludedTables.stream()
+                        .map(
+                                t ->
+                                        String.format(
+                                                "(^%s$)",
+                                                t.getDatabaseName() + "\\." + 
t.getObjectName()))
+                        .collect(Collectors.joining("|"));
+        excludingPattern = "?!" + excludingPattern;
+        return String.format("(%s)(%s)", excludingPattern, includingPattern);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 3fae54f01..02a3b0640 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -18,11 +18,9 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.StringUtils;
 
@@ -52,6 +50,9 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 class KafkaActionUtils {
@@ -61,98 +62,28 @@ class KafkaActionUtils {
     private static final String PARTITION = "partition";
     private static final String OFFSET = "offset";
 
-    static void assertSchemaCompatible(TableSchema tableSchema, Schema 
kafkaCanalSchema) {
-        if (!schemaCompatible(tableSchema, kafkaCanalSchema)) {
-            throw new IllegalArgumentException(
-                    "Paimon schema and Kafka schema are not compatible.\n"
-                            + "Paimon fields are: "
-                            + tableSchema.fields()
-                            + ".\nKafka fields are: "
-                            + kafkaCanalSchema.fields());
-        }
-    }
-
-    static boolean schemaCompatible(TableSchema paimonSchema, Schema 
kafkaCanalSchema) {
-        for (DataField field : kafkaCanalSchema.fields()) {
-            int idx = paimonSchema.fieldNames().indexOf(field.name());
-            if (idx < 0) {
-                return false;
-            }
-            DataType type = paimonSchema.fields().get(idx).type();
-            if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
-                    != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) 
{
-                return false;
-            }
-        }
-        return true;
-    }
-
     static Schema buildPaimonSchema(
             KafkaSchema kafkaSchema,
             List<String> specifiedPartitionKeys,
             List<String> specifiedPrimaryKeys,
             List<ComputedColumn> computedColumns,
-            Map<String, String> paimonConfig,
+            Map<String, String> tableConfig,
             boolean caseSensitive) {
-        Schema.Builder builder = Schema.newBuilder();
-        builder.options(paimonConfig);
-
-        // build columns and primary keys from mySqlSchema
-        Map<String, DataType> mySqlFields;
-        List<String> mySqlPrimaryKeys;
-        if (caseSensitive) {
-            mySqlFields = kafkaSchema.fields();
-            mySqlPrimaryKeys = kafkaSchema.primaryKeys();
-        } else {
-            mySqlFields = new LinkedHashMap<>();
-            for (Map.Entry<String, DataType> entry : 
kafkaSchema.fields().entrySet()) {
-                String fieldName = entry.getKey();
-                checkArgument(
-                        !mySqlFields.containsKey(fieldName.toLowerCase()),
-                        String.format(
-                                "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
-                                fieldName, kafkaSchema.tableName()));
-                mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
-            }
-            mySqlPrimaryKeys =
-                    kafkaSchema.primaryKeys().stream()
-                            .map(String::toLowerCase)
-                            .collect(Collectors.toList());
-        }
-
-        for (Map.Entry<String, DataType> entry : mySqlFields.entrySet()) {
-            builder.column(entry.getKey(), entry.getValue(), null);
-        }
-
-        for (ComputedColumn computedColumn : computedColumns) {
-            builder.column(computedColumn.columnName(), 
computedColumn.columnType());
-        }
-
-        if (specifiedPrimaryKeys.size() > 0) {
-            for (String key : specifiedPrimaryKeys) {
-                if (!mySqlFields.containsKey(key)
-                        && computedColumns.stream().noneMatch(c -> 
c.columnName().equals(key))) {
-                    throw new IllegalArgumentException(
-                            "Specified primary key "
-                                    + key
-                                    + " does not exist in kafka topic's table 
or computed columns.");
-                }
-            }
-            builder.primaryKey(specifiedPrimaryKeys);
-        } else if (mySqlPrimaryKeys.size() > 0) {
-            builder.primaryKey(mySqlPrimaryKeys);
-        } else {
-            throw new IllegalArgumentException(
-                    "Primary keys are not specified. "
-                            + "Also, can't infer primary keys from kafka 
topic's table schemas because "
-                            + "Kafka topic's table have no primary keys or 
have different primary keys.");
-        }
-
-        if (specifiedPartitionKeys.size() > 0) {
-            builder.partitionKeys(specifiedPartitionKeys);
-        }
-
-        return builder.build();
+        LinkedHashMap<String, DataType> sourceColumns =
+                mapKeyCaseConvert(
+                        kafkaSchema.fields(),
+                        caseSensitive,
+                        columnDuplicateErrMsg(kafkaSchema.tableName()));
+        List<String> primaryKeys = listCaseConvert(kafkaSchema.primaryKeys(), 
caseSensitive);
+
+        return CdcActionCommonUtils.buildPaimonSchema(
+                specifiedPartitionKeys,
+                specifiedPrimaryKeys,
+                computedColumns,
+                tableConfig,
+                sourceColumns,
+                null,
+                primaryKeys);
     }
 
     static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 0a5931955..58ff74d84 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -37,8 +37,8 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
@@ -55,13 +55,13 @@ public class KafkaSchema {
     private static final int POLL_TIMEOUT_MILLIS = 1000;
     private final String databaseName;
     private final String tableName;
-    private final Map<String, DataType> fields;
+    private final LinkedHashMap<String, DataType> fields;
     private final List<String> primaryKeys;
 
     public KafkaSchema(
             String databaseName,
             String tableName,
-            Map<String, DataType> fields,
+            LinkedHashMap<String, DataType> fields,
             List<String> primaryKeys) {
         this.databaseName = databaseName;
         this.tableName = tableName;
@@ -77,7 +77,7 @@ public class KafkaSchema {
         return databaseName;
     }
 
-    public Map<String, DataType> fields() {
+    public LinkedHashMap<String, DataType> fields() {
         return fields;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index ef3dee8da..d8669eba3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
@@ -163,7 +164,7 @@ public class KafkaSyncTableAction extends ActionBase {
                         caseSensitive);
         try {
             table = (FileStoreTable) catalog.getTable(identifier);
-            KafkaActionUtils.assertSchemaCompatible(table.schema(), fromCanal);
+            CdcActionCommonUtils.assertSchemaCompatible(table.schema(), 
fromCanal.fields());
         } catch (Catalog.TableNotExistException e) {
             catalog.createTable(identifier, fromCanal, false);
             table = (FileStoreTable) catalog.getTable(identifier);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index 8527d93b6..01a744e96 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -31,14 +31,10 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /**
  * Provides a base implementation for parsing messages of various formats into 
{@link
  * RichCdcMultiplexRecord} objects.
@@ -92,19 +88,6 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
         extractRecords().forEach(out::collect);
     }
 
-    protected Map<String, String> keyCaseInsensitive(Map<String, String> 
origin) {
-        Map<String, String> keyCaseInsensitive = new HashMap<>();
-        for (Map.Entry<String, String> entry : origin.entrySet()) {
-            String fieldName = entry.getKey().toLowerCase();
-            checkArgument(
-                    !keyCaseInsensitive.containsKey(fieldName),
-                    "Duplicate key appears when converting map keys to 
case-insensitive form. Original map is:\n%s",
-                    origin);
-            keyCaseInsensitive.put(fieldName, entry.getValue());
-        }
-        return keyCaseInsensitive;
-    }
-
     protected List<String> extractPrimaryKeys(String primaryKeys) {
         return StreamSupport.stream(root.get(primaryKeys).spliterator(), false)
                 .map(pk -> toFieldName(pk.asText()))
@@ -114,8 +97,4 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
     protected String toFieldName(String rawName) {
         return StringUtils.caseSensitiveConversion(rawName, caseSensitive);
     }
-
-    protected Map<String, String> adjustCase(Map<String, String> map) {
-        return caseSensitive ? map : keyCaseInsensitive(map);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 5f3b82c40..2cc3dd43e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -45,6 +45,8 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
@@ -256,7 +258,7 @@ public class CanalRecordParser extends RecordParser {
         for (JsonNode datum : data) {
             Map<String, String> rowData =
                     extractRowFromJson(datum, mySqlFieldTypes, 
paimonFieldTypes);
-            rowData = adjustCase(rowData);
+            rowData = mapKeyCaseConvert(rowData, caseSensitive, 
recordKeyDuplicateErrMsg(rowData));
 
             records.add(
                     new RichCdcMultiplexRecord(
@@ -291,7 +293,7 @@ public class CanalRecordParser extends RecordParser {
                     before.putIfAbsent(entry.getKey(), entry.getValue());
                 }
 
-                before = adjustCase(before);
+                before = mapKeyCaseConvert(before, caseSensitive, 
recordKeyDuplicateErrMsg(before));
                 records.add(
                         new RichCdcMultiplexRecord(
                                 databaseName,
@@ -301,7 +303,7 @@ public class CanalRecordParser extends RecordParser {
                                 new CdcRecord(RowKind.DELETE, before)));
             }
 
-            after = adjustCase(after);
+            after = mapKeyCaseConvert(after, caseSensitive, 
recordKeyDuplicateErrMsg(after));
             records.add(
                     new RichCdcMultiplexRecord(
                             databaseName,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
index 8d735b504..4704ee66e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
@@ -41,6 +41,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
@@ -93,7 +95,7 @@ public class OggRecordParser extends RecordParser {
             JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> 
records) {
         LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
         Map<String, String> rowData = extractRow(jsonNode, paimonFieldTypes);
-        rowData = adjustCase(rowData);
+        rowData = mapKeyCaseConvert(rowData, caseSensitive, 
recordKeyDuplicateErrMsg(rowData));
         records.add(createRecord(rowKind, rowData, paimonFieldTypes));
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index 5870db179..4b18bc832 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
 
@@ -34,13 +35,15 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -158,36 +161,21 @@ public class MongoDBActionUtils {
     static Schema buildPaimonSchema(
             MongodbSchema mongodbSchema,
             List<String> specifiedPartitionKeys,
-            Map<String, String> paimonConfig,
+            Map<String, String> tableConfig,
             boolean caseSensitive) {
-
-        Schema.Builder builder = Schema.newBuilder().options(paimonConfig);
-
-        Map<String, DataType> mongodbFields =
-                caseSensitive
-                        ? mongodbSchema.fields()
-                        : mongodbSchema.fields().entrySet().stream()
-                                .collect(
-                                        Collectors.toMap(
-                                                entry -> 
entry.getKey().toLowerCase(),
-                                                Map.Entry::getValue,
-                                                (existing, replacement) -> {
-                                                    throw new 
IllegalArgumentException(
-                                                            String.format(
-                                                                    "Duplicate 
key '%s' in table '%s' appears when converting fields map keys to 
case-insensitive form.",
-                                                                    existing,
-                                                                    
mongodbSchema.tableName()));
-                                                },
-                                                LinkedHashMap::new));
-
-        mongodbFields.forEach(builder::column);
-
-        builder.primaryKey(Lists.newArrayList(PRIMARY_KEY));
-
-        if (!specifiedPartitionKeys.isEmpty()) {
-            builder.partitionKeys(specifiedPartitionKeys);
-        }
-
-        return builder.build();
+        LinkedHashMap<String, DataType> sourceColumns =
+                mapKeyCaseConvert(
+                        mongodbSchema.fields(),
+                        caseSensitive,
+                        columnDuplicateErrMsg(mongodbSchema.tableName()));
+
+        return CdcActionCommonUtils.buildPaimonSchema(
+                specifiedPartitionKeys,
+                Lists.newArrayList(PRIMARY_KEY),
+                Collections.emptyList(),
+                tableConfig,
+                sourceColumns,
+                null,
+                Collections.emptyList());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 75a687387..8ec535f19 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
@@ -42,7 +43,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -74,7 +74,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
     private final Map<String, String> tableConfig;
     @Nullable private final Pattern includingPattern;
     @Nullable private final Pattern excludingPattern;
-    @Nullable private final String includingTables;
+    private final String includingTables;
 
     public MongoDBSyncDatabaseAction(
             Map<String, String> mongodbConfig,
@@ -112,7 +112,11 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
 
         MongoDBSource<String> source =
                 MongoDBActionUtils.buildMongodbSource(
-                        mongodbConfig, buildTableList(excludedTables));
+                        mongodbConfig,
+                        CdcActionCommonUtils.combinedModeTableList(
+                                
mongodbConfig.get(MongoDBSourceOptions.DATABASE),
+                                includingTables,
+                                excludedTables));
 
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
         RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
@@ -164,46 +168,6 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
                         tableSuffix));
     }
 
-    private String buildTableList(List<Identifier> excludedTables) {
-        String separatorRex = "\\.";
-        // In COMBINED mode, we should consider both existed tables and 
possible newly added
-        // tables, so we should use regular expression to monitor all valid 
tables and exclude
-        // certain invalid tables
-
-        // The table list is built by template:
-        // (?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
-
-        // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables 
whose qualified name
-        // is exactly equal to 'db.tbl'
-        // The including pattern databasePattern\\.(including_pattern1|...) 
can include tables
-        // whose qualified name matches one of the patterns
-
-        // a table can be monitored only when its name meets the including 
pattern and doesn't
-        // be excluded by excluding pattern at the same time
-        String includingPattern =
-                String.format(
-                        "%s%s(%s)",
-                        mongodbConfig.get(MongoDBSourceOptions.DATABASE),
-                        separatorRex,
-                        includingTables);
-        if (excludedTables.isEmpty()) {
-            return includingPattern;
-        }
-
-        String excludingPattern =
-                excludedTables.stream()
-                        .map(
-                                t ->
-                                        String.format(
-                                                "(^%s$)",
-                                                t.getDatabaseName()
-                                                        + separatorRex
-                                                        + t.getObjectName()))
-                        .collect(Collectors.joining("|"));
-        excludingPattern = "?!" + excludingPattern;
-        return String.format("(%s)(%s)", excludingPattern, includingPattern);
-    }
-
     @VisibleForTesting
     public Map<String, String> tableConfig() {
         return tableConfig;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
index 8c70ba4c1..c266c163f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
@@ -34,7 +34,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
@@ -57,13 +56,13 @@ public class MongodbSchema {
     private static final String ID_FIELD = "_id";
     private final String databaseName;
     private final String tableName;
-    private final Map<String, DataType> fields;
+    private final LinkedHashMap<String, DataType> fields;
     private final List<String> primaryKeys;
 
     public MongodbSchema(
             String databaseName,
             String tableName,
-            Map<String, DataType> fields,
+            LinkedHashMap<String, DataType> fields,
             List<String> primaryKeys) {
         this.databaseName = databaseName;
         this.tableName = tableName;
@@ -79,7 +78,7 @@ public class MongodbSchema {
         return databaseName;
     }
 
-    public Map<String, DataType> fields() {
+    public LinkedHashMap<String, DataType> fields() {
         return fields;
     }
 
@@ -105,7 +104,8 @@ public class MongodbSchema {
 
     private static MongodbSchema createSchemaFromSpecifiedConfig(Configuration 
mongodbConfig) {
         String[] columnNames = mongodbConfig.get(FIELD_NAME).split(",");
-        Map<String, DataType> schemaFields = 
generateSchemaFields(Arrays.asList(columnNames));
+        LinkedHashMap<String, DataType> schemaFields =
+                generateSchemaFields(Arrays.asList(columnNames));
         String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
         String collectionName = 
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
         return new MongodbSchema(
@@ -129,8 +129,8 @@ public class MongodbSchema {
         return document != null ? new ArrayList<>(document.keySet()) : 
Collections.emptyList();
     }
 
-    private static Map<String, DataType> generateSchemaFields(List<String> 
columnNames) {
-        Map<String, DataType> schemaFields = new LinkedHashMap<>();
+    private static LinkedHashMap<String, DataType> 
generateSchemaFields(List<String> columnNames) {
+        LinkedHashMap<String, DataType> schemaFields = new LinkedHashMap<>();
         for (String columnName : columnNames) {
             schemaFields.put(columnName, DataTypes.STRING());
         }
@@ -139,9 +139,11 @@ public class MongodbSchema {
 
     private static MongodbSchema createMongodbSchema(
             String databaseName, String collectionName, List<String> 
columnNames) {
-        Map<String, DataType> schemaFields = generateSchemaFields(columnNames);
         return new MongodbSchema(
-                databaseName, collectionName, schemaFields, 
Collections.singletonList(ID_FIELD));
+                databaseName,
+                collectionName,
+                generateSchemaFields(columnNames),
+                Collections.singletonList(ID_FIELD));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index ffda81304..609e698c8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -39,10 +39,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.PARSER_PATH;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Interface for processing strategies tailored for different MongoDB 
versions. */
 public interface MongoVersionStrategy {
@@ -71,19 +72,6 @@ public interface MongoVersionStrategy {
         return JsonSerdeUtil.parseJsonMap(record, String.class);
     }
 
-    default Map<String, String> keyCaseInsensitive(Map<String, String> origin) 
{
-        Map<String, String> keyCaseInsensitive = new HashMap<>();
-        for (Map.Entry<String, String> entry : origin.entrySet()) {
-            String fieldName = entry.getKey().toLowerCase();
-            checkArgument(
-                    !keyCaseInsensitive.containsKey(fieldName),
-                    "Duplicate key appears when converting map keys to 
case-insensitive form. Original map is:\n%s",
-                    origin);
-            keyCaseInsensitive.put(fieldName, entry.getValue());
-        }
-        return keyCaseInsensitive;
-    }
-
     /**
      * Determines the extraction mode and retrieves the row accordingly.
      *
@@ -104,22 +92,23 @@ public interface MongoVersionStrategy {
                 
SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
         ObjectNode objectNode = (ObjectNode) 
OBJECT_MAPPER.readTree(jsonNode.asText());
         JsonNode document = objectNode.set("_id", 
objectNode.get("_id").get("$oid"));
+        Map<String, String> row;
         switch (mode) {
             case SPECIFIED:
-                Map<String, String> specifiedRow =
+                row =
                         parseFieldsFromJsonRecord(
                                 document.toString(),
                                 mongodbConfig.getString(PARSER_PATH),
                                 mongodbConfig.getString(FIELD_NAME),
                                 paimonFieldTypes);
-                return caseSensitive ? specifiedRow : 
keyCaseInsensitive(specifiedRow);
+                break;
             case DYNAMIC:
-                Map<String, String> dynamicRow =
-                        parseAndTypeJsonRow(document.toString(), 
paimonFieldTypes);
-                return caseSensitive ? dynamicRow : 
keyCaseInsensitive(dynamicRow);
+                row = parseAndTypeJsonRow(document.toString(), 
paimonFieldTypes);
+                break;
             default:
                 throw new RuntimeException("Unsupported extraction mode: " + 
mode);
         }
+        return mapKeyCaseConvert(row, caseSensitive, 
recordKeyDuplicateErrMsg(row));
     }
 
     /**
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 73e83a253..e8f3e7c3f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -19,17 +19,14 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchema;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
-import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
-import org.apache.paimon.utils.Pair;
 
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
 import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
@@ -44,8 +41,6 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -59,15 +54,16 @@ import java.util.Properties;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** Utils for MySQL Action. * */
+/** Utils for MySQL Action. */
 public class MySqlActionUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlActionUtils.class);
     public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
             ConfigOptions.key("scan.newly-added-table.enabled")
                     .booleanType()
@@ -132,106 +128,29 @@ public class MySqlActionUtils {
         return mySqlSchemasInfo;
     }
 
-    static void assertSchemaCompatible(TableSchema paimonSchema, Schema 
mySqlSchema) {
-        if (!schemaCompatible(paimonSchema, mySqlSchema)) {
-            throw new IllegalArgumentException(
-                    "Paimon schema and MySQL schema are not compatible.\n"
-                            + "Paimon fields are: "
-                            + paimonSchema.fields()
-                            + ".\nMySQL fields are: "
-                            + mySqlSchema.fields());
-        }
-    }
-
-    static boolean schemaCompatible(TableSchema paimonSchema, Schema 
mySqlSchema) {
-        for (DataField field : mySqlSchema.fields()) {
-            int idx = paimonSchema.fieldNames().indexOf(field.name());
-            if (idx < 0) {
-                LOG.info("Cannot find field '{}' in Paimon table.", 
field.name());
-                return false;
-            }
-            DataType type = paimonSchema.fields().get(idx).type();
-            if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
-                    != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) 
{
-                LOG.info(
-                        "Cannot convert field '{}' from MySQL type '{}' to 
Paimon type '{}'.",
-                        field.name(),
-                        field.type(),
-                        type);
-                return false;
-            }
-        }
-        return true;
-    }
-
     static Schema buildPaimonSchema(
             MySqlTableInfo mySqlTableInfo,
             List<String> specifiedPartitionKeys,
             List<String> specifiedPrimaryKeys,
             List<ComputedColumn> computedColumns,
-            Map<String, String> paimonConfig,
+            Map<String, String> tableConfig,
             boolean caseSensitive) {
-        Schema.Builder builder = Schema.newBuilder();
-        builder.options(paimonConfig);
         MySqlSchema mySqlSchema = mySqlTableInfo.schema();
+        LinkedHashMap<String, DataType> sourceColumns =
+                mapKeyCaseConvert(
+                        mySqlSchema.typeMapping(),
+                        caseSensitive,
+                        columnDuplicateErrMsg(mySqlTableInfo.location()));
+        List<String> primaryKeys = listCaseConvert(mySqlSchema.primaryKeys(), 
caseSensitive);
 
-        // build columns and primary keys from mySqlSchema
-        LinkedHashMap<String, Pair<DataType, String>> mySqlFields;
-        List<String> mySqlPrimaryKeys;
-        if (caseSensitive) {
-            mySqlFields = mySqlSchema.fields();
-            mySqlPrimaryKeys = mySqlSchema.primaryKeys();
-        } else {
-            mySqlFields = new LinkedHashMap<>();
-            for (Map.Entry<String, Pair<DataType, String>> entry :
-                    mySqlSchema.fields().entrySet()) {
-                String fieldName = entry.getKey();
-                checkArgument(
-                        !mySqlFields.containsKey(fieldName.toLowerCase()),
-                        String.format(
-                                "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
-                                fieldName, mySqlTableInfo.location()));
-                mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
-            }
-            mySqlPrimaryKeys =
-                    mySqlSchema.primaryKeys().stream()
-                            .map(String::toLowerCase)
-                            .collect(Collectors.toList());
-        }
-
-        for (Map.Entry<String, Pair<DataType, String>> entry : 
mySqlFields.entrySet()) {
-            builder.column(entry.getKey(), entry.getValue().getLeft(), 
entry.getValue().getRight());
-        }
-
-        for (ComputedColumn computedColumn : computedColumns) {
-            builder.column(computedColumn.columnName(), 
computedColumn.columnType());
-        }
-
-        if (specifiedPrimaryKeys.size() > 0) {
-            for (String key : specifiedPrimaryKeys) {
-                if (!mySqlFields.containsKey(key)
-                        && computedColumns.stream().noneMatch(c -> 
c.columnName().equals(key))) {
-                    throw new IllegalArgumentException(
-                            "Specified primary key "
-                                    + key
-                                    + " does not exist in MySQL tables or 
computed columns.");
-                }
-            }
-            builder.primaryKey(specifiedPrimaryKeys);
-        } else if (mySqlPrimaryKeys.size() > 0) {
-            builder.primaryKey(mySqlPrimaryKeys);
-        } else {
-            throw new IllegalArgumentException(
-                    "Primary keys are not specified. "
-                            + "Also, can't infer primary keys from MySQL table 
schemas because "
-                            + "MySQL tables have no primary keys or have 
different primary keys.");
-        }
-
-        if (specifiedPartitionKeys.size() > 0) {
-            builder.partitionKeys(specifiedPartitionKeys);
-        }
-
-        return builder.build();
+        return CdcActionCommonUtils.buildPaimonSchema(
+                specifiedPartitionKeys,
+                specifiedPrimaryKeys,
+                computedColumns,
+                tableConfig,
+                sourceColumns,
+                mySqlSchema.comments(),
+                primaryKeys);
     }
 
     static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig, 
String tableList) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 498e5f099..4bdfc3671 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -74,9 +74,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link EventParser} for MySQL Debezium JSON. */
 public class MySqlDebeziumJsonEventParser implements EventParser<String> {
@@ -284,13 +285,13 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
         Map<String, String> before = extractRow(payload.get("before"));
         if (before.size() > 0) {
-            before = caseSensitive ? before : keyCaseInsensitive(before);
+            before = mapKeyCaseConvert(before, caseSensitive, 
recordKeyDuplicateErrMsg(before));
             records.add(new CdcRecord(RowKind.DELETE, before));
         }
 
         Map<String, String> after = extractRow(payload.get("after"));
         if (after.size() > 0) {
-            after = caseSensitive ? after : keyCaseInsensitive(after);
+            after = mapKeyCaseConvert(after, caseSensitive, 
recordKeyDuplicateErrMsg(after));
             records.add(new CdcRecord(RowKind.INSERT, after));
         }
 
@@ -461,19 +462,6 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         return resultMap;
     }
 
-    private Map<String, String> keyCaseInsensitive(Map<String, String> origin) 
{
-        Map<String, String> keyCaseInsensitive = new HashMap<>();
-        for (Map.Entry<String, String> entry : origin.entrySet()) {
-            String fieldName = entry.getKey().toLowerCase();
-            checkArgument(
-                    !keyCaseInsensitive.containsKey(fieldName),
-                    "Duplicate key appears when converting map keys to 
case-insensitive form. Original map is:\n%s",
-                    origin);
-            keyCaseInsensitive.put(fieldName, entry.getValue());
-        }
-        return keyCaseInsensitive;
-    }
-
     private boolean shouldSynchronizeCurrentTable() {
         if (excludedTables.contains(currentTable)) {
             return false;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index d18755dd3..66eaa6614 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -56,8 +56,9 @@ import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.schemaCompatible;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.tableList;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -254,7 +255,14 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         + "MySQL database are not compatible with those of 
existed Paimon tables. Please check the log.");
 
         MySqlSource<String> source =
-                MySqlActionUtils.buildMySqlSource(mySqlConfig, 
buildTableList());
+                MySqlActionUtils.buildMySqlSource(
+                        mySqlConfig,
+                        tableList(
+                                mode,
+                                
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
+                                includingTables,
+                                monitoredTables,
+                                excludedTables));
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
@@ -338,7 +346,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
     private boolean shouldMonitorTable(
             TableSchema tableSchema, Schema mySqlSchema, Supplier<String> 
errMsg) {
-        if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) {
+        if (schemaCompatible(tableSchema, mySqlSchema.fields())) {
             return true;
         } else if (ignoreIncompatible) {
             LOG.warn(errMsg.get() + "This table will be ignored.");
@@ -383,59 +391,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         return tableConfig;
     }
 
-    /**
-     * See {@link 
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils#discoverCapturedTables}
-     * and {@code MySqlSyncDatabaseTableListITCase}.
-     */
-    private String buildTableList() {
-        String separatorRex = "\\.";
-        if (mode == DIVIDED) {
-            // In DIVIDED mode, we only concern about existed tables
-            return monitoredTables.stream()
-                    .map(t -> t.getDatabaseName() + separatorRex + 
t.getObjectName())
-                    .collect(Collectors.joining("|"));
-        } else if (mode == COMBINED) {
-            // In COMBINED mode, we should consider both existed tables and 
possible newly created
-            // tables, so we should use regular expression to monitor all 
valid tables and exclude
-            // certain invalid tables
-
-            // The table list is built by template:
-            // 
(?!(^db\\.tbl$)|(^...$))((databasePattern)\\.(including_pattern1|...))
-
-            // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables 
whose qualified name
-            // is exactly equal to 'db.tbl'
-            // The including pattern 
(databasePattern)\\.(including_pattern1|...) can include tables
-            // whose qualified name matches one of the patterns
-
-            // a table can be monitored only when its name meets the including 
pattern and doesn't
-            // be excluded by excluding pattern at the same time
-            String includingPattern =
-                    String.format(
-                            "(%s)%s(%s)",
-                            mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
-                            separatorRex,
-                            includingTables);
-            if (excludedTables.isEmpty()) {
-                return includingPattern;
-            }
-
-            String excludingPattern =
-                    excludedTables.stream()
-                            .map(
-                                    t ->
-                                            String.format(
-                                                    "(^%s$)",
-                                                    t.getDatabaseName()
-                                                            + separatorRex
-                                                            + 
t.getObjectName()))
-                            .collect(Collectors.joining("|"));
-            excludingPattern = "?!" + excludingPattern;
-            return String.format("(%s)(%s)", excludingPattern, 
includingPattern);
-        }
-
-        throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " + 
mode);
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index aef0597ec..3fca52255 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
@@ -189,7 +190,7 @@ public class MySqlSyncTableAction extends ActionBase {
                         computedFields,
                         fieldNames);
             }
-            MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
+            CdcActionCommonUtils.assertSchemaCompatible(table.schema(), 
fromMySql.fields());
         } catch (Catalog.TableNotExistException e) {
             catalog.createTable(identifier, fromMySql, false);
             table = (FileStoreTable) catalog.getTable(identifier);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 3779d1101..888bbaf73 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -31,10 +31,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
 public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode> {
@@ -81,22 +82,8 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode>
             primaryKeys.add(primary.asText());
         }
 
-        if (!caseSensitive) {
-            LinkedHashMap<String, DataType> tmp = new LinkedHashMap<>();
-            for (Map.Entry<String, DataType> entry : fields.entrySet()) {
-                String fieldName = entry.getKey();
-                checkArgument(
-                        !tmp.containsKey(fieldName.toLowerCase()),
-                        "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
-                        fieldName,
-                        tableName);
-                tmp.put(fieldName.toLowerCase(), entry.getValue());
-            }
-            fields = tmp;
-
-            primaryKeys =
-                    
primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
-        }
+        fields = mapKeyCaseConvert(fields, caseSensitive, 
columnDuplicateErrMsg(tableName));
+        primaryKeys = listCaseConvert(primaryKeys, caseSensitive);
 
         Schema.Builder builder = Schema.newBuilder();
         builder.options(tableConfig);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 5ca2d877b..310fe53ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -28,7 +28,6 @@ import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -94,12 +93,18 @@ public class MySqlSchema {
         return primaryKeys;
     }
 
-    public Map<String, DataType> typeMapping() {
-        Map<String, DataType> typeMapping = new HashMap<>();
+    public LinkedHashMap<String, DataType> typeMapping() {
+        LinkedHashMap<String, DataType> typeMapping = new LinkedHashMap<>();
         fields.forEach((name, pair) -> typeMapping.put(name, pair.getLeft()));
         return typeMapping;
     }
 
+    public List<String> comments() {
+        List<String> comments = new ArrayList<>();
+        fields.forEach((name, pair) -> comments.add(pair.getRight()));
+        return comments;
+    }
+
     public MySqlSchema merge(String currentTable, String otherTable, 
MySqlSchema other) {
         for (Map.Entry<String, Pair<DataType, String>> entry : 
other.fields.entrySet()) {
             String fieldName = entry.getKey();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 520917dc1..6b50274e9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -643,9 +643,9 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
                 .satisfies(
                         anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "Paimon schema and Kafka schema are not 
compatible.\n"
+                                "Paimon schema and source table schema are not 
compatible.\n"
                                         + "Paimon fields are: [`k` STRING NOT 
NULL, `v1` STRING].\n"
-                                        + "Kafka fields are: [`pt` INT NOT 
NULL, `_id` INT NOT NULL, `v1` VARCHAR(10)]"));
+                                        + "Source table fields are: [`pt` INT 
NOT NULL, `_id` INT NOT NULL, `v1` VARCHAR(10)]"));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
index 380582c1b..3bebaddd3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
@@ -202,9 +202,9 @@ public class KafkaOggSyncTableActionITCase extends 
KafkaActionITCaseBase {
                 .satisfies(
                         anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "Paimon schema and Kafka schema are not 
compatible.\n"
+                                "Paimon schema and source table schema are not 
compatible.\n"
                                         + "Paimon fields are: [`k` STRING NOT 
NULL, `v1` STRING].\n"
-                                        + "Kafka fields are: [`id` STRING NOT 
NULL, `name` STRING, `description` STRING, `weight` STRING]"));
+                                        + "Source table fields are: [`id` 
STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]"));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 99c6b91d5..a22b568f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -644,11 +644,11 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                 .satisfies(
                         anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "Paimon schema and MySQL schema are not 
compatible."));
+                                "Paimon schema and source table schema are not 
compatible."));
     }
 
     @Test
-    public void testInvalidPrimaryKey() {
+    public void testInvalidPrimaryKey() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "schema_evolution_\\d+");
@@ -660,7 +660,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                 .satisfies(
                         anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "Specified primary key pk does not exist in 
MySQL tables or computed columns."));
+                                "Specified primary key 'pk' does not exist in 
source tables or computed columns."));
     }
 
     @Test
@@ -676,8 +676,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         anyCauseMatches(
                                 IllegalArgumentException.class,
                                 "Primary keys are not specified. "
-                                        + "Also, can't infer primary keys from 
MySQL table schemas because "
-                                        + "MySQL tables have no primary keys 
or have different primary keys."));
+                                        + "Also, can't infer primary keys from 
source table schemas because "
+                                        + "source tables have no primary keys 
or have different primary keys."));
     }
 
     @Test

Reply via email to