This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a1093123 [cdc] Handle case-insensitive in separate operator instead 
of record parsers (#3551)
6a1093123 is described below

commit 6a1093123ad3457ca34205352b3362367a9ab3f9
Author: yuzelin <[email protected]>
AuthorDate: Thu Jun 20 11:22:42 2024 +0800

    [cdc] Handle case-insensitive in separate operator instead of record 
parsers (#3551)
---
 .../flink/action/cdc/CdcActionCommonUtils.java     | 131 +++++----------------
 .../flink/action/cdc/MessageQueueSchemaUtils.java  |   3 +-
 .../flink/action/cdc/SyncDatabaseActionBase.java   |   2 +-
 .../paimon/flink/action/cdc/SyncJobHandler.java    |  17 +--
 .../flink/action/cdc/SyncTableActionBase.java      |   3 +-
 .../paimon/flink/action/cdc/format/DataFormat.java |   5 +-
 .../flink/action/cdc/format/RecordParser.java      |  22 +---
 .../action/cdc/format/RecordParserFactory.java     |   4 +-
 .../action/cdc/format/canal/CanalRecordParser.java |   5 +-
 .../cdc/format/debezium/DebeziumRecordParser.java  |   5 +-
 .../action/cdc/format/json/JsonRecordParser.java   |   5 +-
 .../cdc/format/maxwell/MaxwellRecordParser.java    |   5 +-
 .../action/cdc/format/ogg/OggRecordParser.java     |   5 +-
 .../action/cdc/mongodb/MongoDBRecordParser.java    |  12 +-
 .../mongodb/strategy/Mongo4VersionStrategy.java    |  11 --
 .../flink/action/cdc/mysql/MySqlRecordParser.java  |  22 +---
 .../action/cdc/postgres/PostgresRecordParser.java  |  33 +-----
 .../paimon/flink/sink/cdc/CaseSensitiveUtils.java  |  75 ++++++++++++
 .../paimon/flink/sink/cdc/CdcMultiplexRecord.java  |   4 +
 .../apache/paimon/flink/sink/cdc/CdcRecord.java    |  14 ++-
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      |  13 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |  18 ++-
 .../flink/sink/cdc/NewTableSchemaBuilder.java      |  29 ++---
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |  20 ++--
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |  12 +-
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 119 +++++++++++--------
 .../database/case-insensitive/canal-data-1.txt     |   1 +
 .../test/resources/mysql/sync_database_setup.sql   |  19 ++-
 28 files changed, 293 insertions(+), 321 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 3c5d586db..bc307cca4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -31,20 +31,19 @@ import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
 
 /** Common utils for CDC Action. */
 public class CdcActionCommonUtils {
@@ -100,95 +99,12 @@ public class CdcActionCommonUtils {
         return true;
     }
 
-    public static List<DataField> fieldNameCaseConvert(
-            List<DataField> origin, boolean caseSensitive, String tableName) {
-        Set<String> existedFields = new HashSet<>();
-        Function<String, String> columnDuplicateErrMsg =
-                columnDuplicateErrMsg(tableName == null ? "UNKNOWN" : 
tableName);
-        return origin.stream()
-                .map(
-                        field -> {
-                            if (caseSensitive) {
-                                return field;
-                            }
-                            String columnLowerCase = 
field.name().toLowerCase();
-                            if (!existedFields.add(columnLowerCase)) {
-                                throw new IllegalArgumentException(
-                                        
columnDuplicateErrMsg.apply(field.name()));
-                            }
-                            return field.newName(columnLowerCase);
-                        })
-                .collect(Collectors.toList());
-    }
-
-    public static <T> LinkedHashMap<String, T> mapKeyCaseConvert(
-            LinkedHashMap<String, T> origin,
-            boolean caseSensitive,
-            Function<String, String> duplicateErrMsg) {
-        return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg, 
LinkedHashMap::new);
-    }
-
-    public static <T> Map<String, T> mapKeyCaseConvert(
-            Map<String, T> origin,
-            boolean caseSensitive,
-            Function<String, String> duplicateErrMsg) {
-        return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg, 
HashMap::new);
-    }
-
-    private static <T, M extends Map<String, T>> M mapKeyCaseConvert(
-            M origin,
-            boolean caseSensitive,
-            Function<String, String> duplicateErrMsg,
-            Supplier<M> mapSupplier) {
-        if (caseSensitive) {
-            return origin;
-        } else {
-            M newMap = mapSupplier.get();
-            for (Map.Entry<String, T> entry : origin.entrySet()) {
-                String key = entry.getKey();
-                if (newMap.containsKey(key.toLowerCase())) {
-                    throw new 
IllegalArgumentException(duplicateErrMsg.apply(key));
-                }
-                newMap.put(key.toLowerCase(), entry.getValue());
-            }
-            return newMap;
-        }
-    }
-
-    public static Function<String, String> columnDuplicateErrMsg(String 
tableName) {
-        return column ->
-                String.format(
-                        "Failed to convert columns of table '%s' to 
case-insensitive form because duplicate column found: '%s'.",
-                        tableName, column);
-    }
-
-    public static Function<String, String> 
recordKeyDuplicateErrMsg(Map<String, String> record) {
-        return column ->
-                "Failed to convert record map to case-insensitive form because 
duplicate column found. Original record map is:\n"
-                        + record;
-    }
-
     public static List<String> listCaseConvert(List<String> origin, boolean 
caseSensitive) {
         return caseSensitive
                 ? origin
                 : 
origin.stream().map(String::toLowerCase).collect(Collectors.toList());
     }
 
-    public static String columnCaseConvertAndDuplicateCheck(
-            String column,
-            Set<String> existedFields,
-            boolean caseSensitive,
-            Function<String, String> columnDuplicateErrMsg) {
-        if (caseSensitive) {
-            return column;
-        }
-        String columnLowerCase = column.toLowerCase();
-        if (!existedFields.add(columnLowerCase)) {
-            throw new 
IllegalArgumentException(columnDuplicateErrMsg.apply(column));
-        }
-        return columnLowerCase;
-    }
-
     public static Schema buildPaimonSchema(
             String tableName,
             List<String> specifiedPartitionKeys,
@@ -206,36 +122,30 @@ public class CdcActionCommonUtils {
         builder.options(sourceSchema.options());
 
         // fields
-        Set<String> existedFields = new HashSet<>();
-        Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(tableName);
+        List<String> allFieldNames = new ArrayList<>();
 
         for (DataField field : sourceSchema.fields()) {
-            String fieldName =
-                    columnCaseConvertAndDuplicateCheck(
-                            field.name(), existedFields, caseSensitive, 
columnDuplicateErrMsg);
+            String fieldName = caseSensitiveConversion(field.name(), 
caseSensitive);
+            allFieldNames.add(fieldName);
             builder.column(fieldName, field.type(), field.description());
         }
 
         for (ComputedColumn computedColumn : computedColumns) {
             String computedColumnName =
-                    columnCaseConvertAndDuplicateCheck(
-                            computedColumn.columnName(),
-                            existedFields,
-                            caseSensitive,
-                            columnDuplicateErrMsg);
+                    caseSensitiveConversion(computedColumn.columnName(), 
caseSensitive);
+            allFieldNames.add(computedColumnName);
             builder.column(computedColumnName, computedColumn.columnType());
         }
 
         for (CdcMetadataConverter metadataConverter : metadataConverters) {
             String metadataColumnName =
-                    columnCaseConvertAndDuplicateCheck(
-                            metadataConverter.columnName(),
-                            existedFields,
-                            caseSensitive,
-                            columnDuplicateErrMsg);
+                    caseSensitiveConversion(metadataConverter.columnName(), 
caseSensitive);
+            allFieldNames.add(metadataColumnName);
             builder.column(metadataColumnName, metadataConverter.dataType());
         }
 
+        checkDuplicateFields(tableName, allFieldNames);
+
         // primary keys
         if (!specifiedPrimaryKeys.isEmpty()) {
             Set<String> sourceColumns =
@@ -272,6 +182,21 @@ public class CdcActionCommonUtils {
         return builder.build();
     }
 
+    public static void checkDuplicateFields(String tableName, List<String> 
fieldNames) {
+        List<String> duplicates =
+                fieldNames.stream()
+                        .filter(name -> Collections.frequency(fieldNames, 
name) > 1)
+                        .collect(Collectors.toList());
+        checkState(
+                duplicates.isEmpty(),
+                "Table %s contains duplicate columns: %s.\n"
+                        + "Possible causes are: "
+                        + "1. computed columns or metadata columns contain 
duplicate fields; "
+                        + "2. the catalog is case-insensitive and the table 
columns duplicate after they are all converted to lower-case.",
+                tableName,
+                duplicates);
+    }
+
     public static String tableList(
             MultiTablesSinkMode mode,
             String databasePattern,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
index 73e63bcd0..c517670e4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
@@ -49,8 +49,7 @@ public class MessageQueueSchemaUtils {
         int retry = 0;
         int retryInterval = 1000;
 
-        RecordParser recordParser =
-                dataFormat.createParser(true, typeMapping, 
Collections.emptyList());
+        RecordParser recordParser = dataFormat.createParser(typeMapping, 
Collections.emptyList());
 
         while (true) {
             Optional<Schema> schema =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4fa6ca76c..4c7f8ea27 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -112,7 +112,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
     @Override
     protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
recordParse() {
         return syncJobHandler.provideRecordParser(
-                caseSensitive, Collections.emptyList(), typeMapping, 
metadataConverters);
+                Collections.emptyList(), typeMapping, metadataConverters);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index c674e560b..6116d8ba5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -193,31 +193,22 @@ public class SyncJobHandler {
     }
 
     public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
provideRecordParser(
-            boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TypeMapping typeMapping,
             CdcMetadataConverter[] metadataConverters) {
         switch (sourceType) {
             case MYSQL:
                 return new MySqlRecordParser(
-                        cdcSourceConfig,
-                        caseSensitive,
-                        computedColumns,
-                        typeMapping,
-                        metadataConverters);
+                        cdcSourceConfig, computedColumns, typeMapping, 
metadataConverters);
             case POSTGRES:
                 return new PostgresRecordParser(
-                        cdcSourceConfig,
-                        caseSensitive,
-                        computedColumns,
-                        typeMapping,
-                        metadataConverters);
+                        cdcSourceConfig, computedColumns, typeMapping, 
metadataConverters);
             case KAFKA:
             case PULSAR:
                 DataFormat dataFormat = provideDataFormat();
-                return dataFormat.createParser(caseSensitive, typeMapping, 
computedColumns);
+                return dataFormat.createParser(typeMapping, computedColumns);
             case MONGODB:
-                return new MongoDBRecordParser(caseSensitive, computedColumns, 
cdcSourceConfig);
+                return new MongoDBRecordParser(computedColumns, 
cdcSourceConfig);
             default:
                 throw new UnsupportedOperationException("Unknown source type " 
+ sourceType);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index f4e5bfe6a..1a847d3a0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -156,8 +156,7 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
 
     @Override
     protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
recordParse() {
-        return syncJobHandler.provideRecordParser(
-                caseSensitive, computedColumns, typeMapping, 
metadataConverters);
+        return syncJobHandler.provideRecordParser(computedColumns, 
typeMapping, metadataConverters);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
index e7e588e4e..2af44c902 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
@@ -53,13 +53,12 @@ public enum DataFormat {
      * Creates a new instance of {@link RecordParser} for this data format 
with the specified
      * configurations.
      *
-     * @param caseSensitive Indicates whether the parser should be 
case-sensitive.
      * @param computedColumns List of computed columns to be considered by the 
parser.
      * @return A new instance of {@link RecordParser}.
      */
     public RecordParser createParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
-        return parser.createParser(caseSensitive, typeMapping, 
computedColumns);
+            TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
+        return parser.createParser(typeMapping, computedColumns);
     }
 
     public static DataFormat fromConfigString(String format) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 38076428f..ea9fa5492 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -51,10 +51,6 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static org.apache.paimon.utils.JsonSerdeUtil.convertValue;
 import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
 import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
@@ -75,15 +71,12 @@ public abstract class RecordParser
 
     protected static final String FIELD_TABLE = "table";
     protected static final String FIELD_DATABASE = "database";
-    private final boolean caseSensitive;
     protected final TypeMapping typeMapping;
     protected final List<ComputedColumn> computedColumns;
 
     protected JsonNode root;
 
-    public RecordParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
-        this.caseSensitive = caseSensitive;
+    public RecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
         this.typeMapping = typeMapping;
         this.computedColumns = computedColumns;
     }
@@ -203,15 +196,12 @@ public abstract class RecordParser
     /** Handle case sensitivity here. */
     private RichCdcMultiplexRecord createRecord(
             RowKind rowKind, Map<String, String> data, List<DataField> 
paimonFields) {
-        String databaseName = getDatabaseName();
-        String tableName = getTableName();
-        paimonFields = fieldNameCaseConvert(paimonFields, caseSensitive, 
tableName);
-
-        data = mapKeyCaseConvert(data, caseSensitive, 
recordKeyDuplicateErrMsg(data));
-        List<String> primaryKeys = listCaseConvert(extractPrimaryKeys(), 
caseSensitive);
-
         return new RichCdcMultiplexRecord(
-                databaseName, tableName, paimonFields, primaryKeys, new 
CdcRecord(rowKind, data));
+                getDatabaseName(),
+                getTableName(),
+                paimonFields,
+                extractPrimaryKeys(),
+                new CdcRecord(rowKind, data));
     }
 
     protected void setRoot(CdcSourceRecord record) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
index 29c3aafb2..612dfefd6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
@@ -37,11 +37,9 @@ public interface RecordParserFactory {
     /**
      * Creates a new instance of {@link RecordParser} with the specified 
configurations.
      *
-     * @param caseSensitive Indicates whether the parser should be 
case-sensitive.
      * @param typeMapping Data type mapping options.
      * @param computedColumns List of computed columns to be considered by the 
parser.
      * @return A new instance of {@link RecordParser}.
      */
-    RecordParser createParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns);
+    RecordParser createParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns);
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 642dc4241..76671739a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -82,9 +82,8 @@ public class CanalRecordParser extends RecordParser {
         return !isNull(node) && node.asBoolean();
     }
 
-    public CanalRecordParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, computedColumns);
+    public CanalRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
+        super(typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
index ec23132ff..c2b658754 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -83,9 +83,8 @@ public class DebeziumRecordParser extends RecordParser {
     private final Map<String, String> classNames = new HashMap<>();
     private final Map<String, Map<String, String>> parameters = new 
HashMap<>();
 
-    public DebeziumRecordParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, computedColumns);
+    public DebeziumRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
+        super(typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
index e7f30c0b0..1a67f82f4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
@@ -36,9 +36,8 @@ import java.util.List;
  */
 public class JsonRecordParser extends RecordParser {
 
-    public JsonRecordParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, computedColumns);
+    public JsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
+        super(typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
index 7379f81a5..0c9393ee6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
@@ -50,9 +50,8 @@ public class MaxwellRecordParser extends RecordParser {
     private static final String OP_UPDATE = "update";
     private static final String OP_DELETE = "delete";
 
-    public MaxwellRecordParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, computedColumns);
+    public MaxwellRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
+        super(typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
index 019ae1562..bc1efdf7c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
@@ -58,9 +58,8 @@ public class OggRecordParser extends RecordParser {
     private static final String OP_INSERT = "I";
     private static final String OP_DELETE = "D";
 
-    public OggRecordParser(
-            boolean caseSensitive, TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
-        super(caseSensitive, typeMapping, computedColumns);
+    public OggRecordParser(TypeMapping typeMapping, List<ComputedColumn> 
computedColumns) {
+        super(typeMapping, computedColumns);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index 9264e409f..251fd93bb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -60,15 +60,10 @@ public class MongoDBRecordParser
     private static final String FIELD_NAMESPACE = "ns";
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private final List<ComputedColumn> computedColumns;
-    private final boolean caseSensitive;
     private final Configuration mongodbConfig;
     private JsonNode root;
 
-    public MongoDBRecordParser(
-            boolean caseSensitive,
-            List<ComputedColumn> computedColumns,
-            Configuration mongodbConfig) {
-        this.caseSensitive = caseSensitive;
+    public MongoDBRecordParser(List<ComputedColumn> computedColumns, 
Configuration mongodbConfig) {
         this.computedColumns = computedColumns;
         this.mongodbConfig = mongodbConfig;
     }
@@ -81,7 +76,7 @@ public class MongoDBRecordParser
         String collection = extractString(FIELD_TABLE);
         MongoVersionStrategy versionStrategy =
                 VersionStrategyFactory.create(
-                        databaseName, collection, caseSensitive, 
computedColumns, mongodbConfig);
+                        databaseName, collection, computedColumns, 
mongodbConfig);
         versionStrategy.extractRecords(root).forEach(out::collect);
     }
 
@@ -93,7 +88,6 @@ public class MongoDBRecordParser
         static MongoVersionStrategy create(
                 String databaseName,
                 String collection,
-                boolean caseSensitive,
                 List<ComputedColumn> computedColumns,
                 Configuration mongodbConfig) {
             // TODO: When MongoDB CDC is upgraded to 2.5, uncomment the 
version check logic
@@ -101,7 +95,7 @@ public class MongoDBRecordParser
             //     return new Mongo6VersionStrategy(databaseName, collection, 
caseSensitive);
             // }
             return new Mongo4VersionStrategy(
-                    databaseName, collection, caseSensitive, computedColumns, 
mongodbConfig);
+                    databaseName, collection, computedColumns, mongodbConfig);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index e64cf3e8a..450eacaef 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -34,10 +34,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
-
 /**
  * Implementation class for extracting records from MongoDB versions greater 
than 4.x and less than
  * 6.x.
@@ -53,19 +49,16 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
     private static final String OP_DELETE = "delete";
     private final String databaseName;
     private final String collection;
-    private final boolean caseSensitive;
     private final Configuration mongodbConfig;
     private final List<ComputedColumn> computedColumns;
 
     public Mongo4VersionStrategy(
             String databaseName,
             String collection,
-            boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             Configuration mongodbConfig) {
         this.databaseName = databaseName;
         this.collection = collection;
-        this.caseSensitive = caseSensitive;
         this.mongodbConfig = mongodbConfig;
         this.computedColumns = computedColumns;
     }
@@ -133,11 +126,7 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
         RowType.Builder rowTypeBuilder = RowType.builder();
         Map<String, String> record =
                 getExtractRow(fullDocument, rowTypeBuilder, computedColumns, 
mongodbConfig);
-
-        record = mapKeyCaseConvert(record, caseSensitive, 
recordKeyDuplicateErrMsg(record));
-
         List<DataField> fields = rowTypeBuilder.build().getFields();
-        fields = fieldNameCaseConvert(fields, caseSensitive, collection);
 
         return new RichCdcMultiplexRecord(
                 databaseName,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 745a7932d..3f54db541 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -58,13 +58,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
 
@@ -78,7 +72,6 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final ZoneId serverTimeZone;
-    private final boolean caseSensitive;
     private final List<ComputedColumn> computedColumns;
     private final TypeMapping typeMapping;
 
@@ -93,11 +86,9 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
     public MySqlRecordParser(
             Configuration mySqlConfig,
-            boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TypeMapping typeMapping,
             CdcMetadataConverter[] metadataConverters) {
-        this.caseSensitive = caseSensitive;
         this.computedColumns = computedColumns;
         this.typeMapping = typeMapping;
         this.metadataConverters = metadataConverters;
@@ -162,7 +153,7 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
         Table table = tableChange.getTable();
 
         List<DataField> fields = extractFields(table);
-        List<String> primaryKeys = 
listCaseConvert(table.primaryKeyColumnNames(), caseSensitive);
+        List<String> primaryKeys = table.primaryKeyColumnNames();
 
         // TODO : add table comment and column comment when we upgrade flink 
cdc to 2.4
         return Collections.singletonList(
@@ -173,15 +164,8 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
     private List<DataField> extractFields(Table table) {
         RowType.Builder rowType = RowType.builder();
         List<Column> columns = table.columns();
-        Set<String> existedFields = new HashSet<>();
-        Function<String, String> columnDuplicateErrMsg =
-                columnDuplicateErrMsg(table.id().toString());
 
         for (Column column : columns) {
-            String columnName =
-                    columnCaseConvertAndDuplicateCheck(
-                            column.name(), existedFields, caseSensitive, 
columnDuplicateErrMsg);
-
             DataType dataType =
                     MySqlTypeUtils.toDataType(
                             column.typeExpression(),
@@ -190,7 +174,7 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
                             typeMapping);
             dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || 
column.isOptional());
 
-            rowType.field(columnName, dataType);
+            rowType.field(column.name(), dataType);
         }
         return rowType.build().getFields();
     }
@@ -203,13 +187,11 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
         Map<String, String> before = extractRow(root.payload().before());
         if (!before.isEmpty()) {
-            before = mapKeyCaseConvert(before, caseSensitive, 
recordKeyDuplicateErrMsg(before));
             records.add(createRecord(RowKind.DELETE, before));
         }
 
         Map<String, String> after = extractRow(root.payload().after());
         if (!after.isEmpty()) {
-            after = mapKeyCaseConvert(after, caseSensitive, 
recordKeyDuplicateErrMsg(after));
             records.add(createRecord(RowKind.INSERT, after));
         }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 0299d57f7..0d672e85d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -64,17 +64,10 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
@@ -91,7 +84,6 @@ public class PostgresRecordParser
 
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final ZoneId serverTimeZone;
-    private final boolean caseSensitive;
     private final List<ComputedColumn> computedColumns;
     private final TypeMapping typeMapping;
 
@@ -104,24 +96,9 @@ public class PostgresRecordParser
 
     public PostgresRecordParser(
             Configuration postgresConfig,
-            boolean caseSensitive,
-            TypeMapping typeMapping,
-            CdcMetadataConverter[] metadataConverters) {
-        this(
-                postgresConfig,
-                caseSensitive,
-                Collections.emptyList(),
-                typeMapping,
-                metadataConverters);
-    }
-
-    public PostgresRecordParser(
-            Configuration postgresConfig,
-            boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TypeMapping typeMapping,
             CdcMetadataConverter[] metadataConverters) {
-        this.caseSensitive = caseSensitive;
         this.computedColumns = computedColumns;
         this.typeMapping = typeMapping;
         this.metadataConverters = metadataConverters;
@@ -155,20 +132,14 @@ public class PostgresRecordParser
                         + "in the JsonDebeziumDeserializationSchema you 
created");
 
         RowType.Builder rowType = RowType.builder();
-        Set<String> existedFields = new HashSet<>();
-        Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(currentTable);
         afterFields.forEach(
                 (key, value) -> {
-                    String columnName =
-                            columnCaseConvertAndDuplicateCheck(
-                                    key, existedFields, caseSensitive, 
columnDuplicateErrMsg);
-
                     DataType dataType = extractFieldType(value);
                     dataType =
                             dataType.copy(
                                     typeMapping.containsMode(TO_NULLABLE) || 
value.optional());
 
-                    rowType.field(columnName, dataType);
+                    rowType.field(key, dataType);
                 });
         return rowType.build().getFields();
     }
@@ -241,13 +212,11 @@ public class PostgresRecordParser
 
         Map<String, String> before = extractRow(root.payload().before());
         if (!before.isEmpty()) {
-            before = mapKeyCaseConvert(before, caseSensitive, 
recordKeyDuplicateErrMsg(before));
             records.add(createRecord(RowKind.DELETE, before));
         }
 
         Map<String, String> after = extractRow(root.payload().after());
         if (!after.isEmpty()) {
-            after = mapKeyCaseConvert(after, caseSensitive, 
recordKeyDuplicateErrMsg(after));
             List<DataField> fields = extractFields(root.schema());
             records.add(
                     new RichCdcMultiplexRecord(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
new file mode 100644
index 000000000..49890a11c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.catalog.Catalog;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+/** Add convert operator if the catalog is case-insensitive. */
+public class CaseSensitiveUtils {
+
+    public static DataStream<CdcRecord> cdcRecordConvert(
+            Catalog.Loader catalogLoader, DataStream<CdcRecord> input) {
+        if (caseSensitive(catalogLoader)) {
+            return input;
+        }
+
+        return input.forward()
+                .process(
+                        new ProcessFunction<CdcRecord, CdcRecord>() {
+                            @Override
+                            public void processElement(
+                                    CdcRecord record, Context ctx, 
Collector<CdcRecord> out) {
+                                out.collect(record.fieldNameLowerCase());
+                            }
+                        })
+                .name("Case-insensitive Convert");
+    }
+
+    public static DataStream<CdcMultiplexRecord> cdcMultiplexRecordConvert(
+            Catalog.Loader catalogLoader, DataStream<CdcMultiplexRecord> 
input) {
+        if (caseSensitive(catalogLoader)) {
+            return input;
+        }
+
+        return input.forward()
+                .process(
+                        new ProcessFunction<CdcMultiplexRecord, 
CdcMultiplexRecord>() {
+                            @Override
+                            public void processElement(
+                                    CdcMultiplexRecord record,
+                                    Context ctx,
+                                    Collector<CdcMultiplexRecord> out) {
+                                out.collect(record.fieldNameLowerCase());
+                            }
+                        })
+                .name("Case-insensitive Convert");
+    }
+
+    private static boolean caseSensitive(Catalog.Loader catalogLoader) {
+        try (Catalog catalog = catalogLoader.load()) {
+            return catalog.caseSensitive();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
index ed97907ce..ce68f300a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
@@ -55,6 +55,10 @@ public class CdcMultiplexRecord implements Serializable {
         return record;
     }
 
+    public CdcMultiplexRecord fieldNameLowerCase() {
+        return new CdcMultiplexRecord(databaseName, tableName, 
record.fieldNameLowerCase());
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof CdcMultiplexRecord)) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 61d4ec7a1..9adca753d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.RowKind;
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -41,11 +42,6 @@ public class CdcRecord implements Serializable {
         this.fields = fields;
     }
 
-    public CdcRecord setRowKind(RowKind kind) {
-        this.kind = kind;
-        return this;
-    }
-
     public static CdcRecord emptyRecord() {
         return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
     }
@@ -58,6 +54,14 @@ public class CdcRecord implements Serializable {
         return fields;
     }
 
+    public CdcRecord fieldNameLowerCase() {
+        Map<String, String> newFields = new HashMap<>();
+        for (Map.Entry<String, String> entry : fields.entrySet()) {
+            newFields.put(entry.getKey().toLowerCase(), entry.getValue());
+        }
+        return new CdcRecord(kind, newFields);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof CdcRecord)) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 87d42105a..f45b4f96d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -99,6 +99,7 @@ public class CdcSinkBuilder<T> {
         SingleOutputStreamOperator<CdcRecord> parsed =
                 input.forward()
                         .process(new 
CdcParsingProcessFunction<>(parserFactory))
+                        .name("Side Output")
                         .setParallelism(input.getParallelism());
 
         DataStream<Void> schemaChangeProcessFunction =
@@ -108,18 +109,22 @@ public class CdcSinkBuilder<T> {
                                 new UpdatedDataFieldsProcessFunction(
                                         new SchemaManager(dataTable.fileIO(), 
dataTable.location()),
                                         identifier,
-                                        catalogLoader));
+                                        catalogLoader))
+                        .name("Schema Evolution");
         schemaChangeProcessFunction.getTransformation().setParallelism(1);
         schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
 
+        DataStream<CdcRecord> converted =
+                CaseSensitiveUtils.cdcRecordConvert(catalogLoader, parsed);
         BucketMode bucketMode = dataTable.bucketMode();
         switch (bucketMode) {
             case HASH_FIXED:
-                return buildForFixedBucket(parsed);
+                return buildForFixedBucket(converted);
             case HASH_DYNAMIC:
-                return new CdcDynamicBucketSink((FileStoreTable) 
table).build(parsed, parallelism);
+                return new CdcDynamicBucketSink((FileStoreTable) table)
+                        .build(converted, parallelism);
             case BUCKET_UNAWARE:
-                return buildForUnawareBucket(parsed);
+                return buildForUnawareBucket(converted);
             default:
                 throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index be13e7f67..629b7845d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -158,9 +158,12 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                 .process(new 
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
                 .name("Schema Evolution");
 
+        DataStream<CdcMultiplexRecord> converted =
+                CaseSensitiveUtils.cdcMultiplexRecordConvert(catalogLoader, 
newlyAddedTableStream);
+
         DataStream<CdcMultiplexRecord> partitioned =
                 partition(
-                        newlyAddedTableStream,
+                        converted,
                         new CdcMultiplexRecordChannelComputer(catalogLoader),
                         parallelism);
 
@@ -186,6 +189,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         SingleOutputStreamOperator<Void> parsed =
                 input.forward()
                         .process(new 
CdcMultiTableParsingProcessFunction<>(parserFactory))
+                        .name("Side Output")
                         .setParallelism(input.getParallelism());
 
         for (FileStoreTable table : tables) {
@@ -198,7 +202,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                                     new UpdatedDataFieldsProcessFunction(
                                             new SchemaManager(table.fileIO(), 
table.location()),
                                             Identifier.create(database, 
table.name()),
-                                            catalogLoader));
+                                            catalogLoader))
+                            .name("Schema Evolution");
             schemaChangeProcessFunction.getTransformation().setParallelism(1);
             
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
 
@@ -208,16 +213,19 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                             
CdcMultiTableParsingProcessFunction.createRecordOutputTag(
                                     table.name()));
 
+            DataStream<CdcRecord> converted =
+                    CaseSensitiveUtils.cdcRecordConvert(catalogLoader, 
parsedForTable);
+
             BucketMode bucketMode = table.bucketMode();
             switch (bucketMode) {
                 case HASH_FIXED:
-                    buildForFixedBucket(table, parsedForTable);
+                    buildForFixedBucket(table, converted);
                     break;
                 case HASH_DYNAMIC:
-                    new CdcDynamicBucketSink(table).build(parsedForTable, 
parallelism);
+                    new CdcDynamicBucketSink(table).build(converted, 
parallelism);
                     break;
                 case BUCKET_UNAWARE:
-                    buildForUnawareBucket(table, parsedForTable);
+                    buildForUnawareBucket(table, converted);
                     break;
                 case CROSS_PARTITION:
                 default:
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index 795fdb0f9..cef7f011a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -23,15 +23,14 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
 
 import java.io.Serializable;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkDuplicateFields;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
 
 /** Build schema for new table found in database synchronization. */
 public class NewTableSchemaBuilder implements Serializable {
@@ -53,30 +52,24 @@ public class NewTableSchemaBuilder implements Serializable {
         Schema.Builder builder = Schema.newBuilder();
         builder.options(tableConfig);
 
-        String tableName = record.tableName();
-        tableName = tableName == null ? "UNKNOWN" : tableName;
-
         // fields
-        Set<String> existedFields = new HashSet<>();
-        Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(tableName);
+        List<String> allFieldNames = new ArrayList<>();
 
         for (DataField dataField : record.fields()) {
-            String fieldName =
-                    columnCaseConvertAndDuplicateCheck(
-                            dataField.name(), existedFields, caseSensitive, 
columnDuplicateErrMsg);
+            String fieldName = caseSensitiveConversion(dataField.name(), 
caseSensitive);
+            allFieldNames.add(fieldName);
             builder.column(fieldName, dataField.type(), 
dataField.description());
         }
 
         for (CdcMetadataConverter metadataConverter : metadataConverters) {
             String metadataColumnName =
-                    columnCaseConvertAndDuplicateCheck(
-                            metadataConverter.columnName(),
-                            existedFields,
-                            caseSensitive,
-                            columnDuplicateErrMsg);
+                    caseSensitiveConversion(metadataConverter.columnName(), 
caseSensitive);
+            allFieldNames.add(metadataColumnName);
             builder.column(metadataColumnName, metadataConverter.dataType());
         }
 
+        checkDuplicateFields(record.tableName(), allFieldNames);
+
         builder.primaryKey(listCaseConvert(record.primaryKeys(), 
caseSensitive));
 
         return Optional.of(builder.build());
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 8fefcab63..3d832d339 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -46,8 +47,10 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
     private static final Logger LOG =
             
LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
 
-    protected Catalog catalog;
     protected final Catalog.Loader catalogLoader;
+    protected Catalog catalog;
+    private boolean caseSensitive;
+
     private static final List<DataTypeRoot> STRING_TYPES =
             Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
     private static final List<DataTypeRoot> BINARY_TYPES =
@@ -73,6 +76,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, 
O> extends Process
     @Override
     public void open(Configuration parameters) {
         this.catalog = catalogLoader.load();
+        this.caseSensitive = this.catalog.caseSensitive();
     }
 
     protected void applySchemaChange(
@@ -198,8 +202,10 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
 
         List<SchemaChange> result = new ArrayList<>();
         for (DataField newField : updatedDataFields) {
-            if (oldFields.containsKey(newField.name())) {
-                DataField oldField = oldFields.get(newField.name());
+            String newFieldName =
+                    StringUtils.caseSensitiveConversion(newField.name(), 
caseSensitive);
+            if (oldFields.containsKey(newFieldName)) {
+                DataField oldField = oldFields.get(newFieldName);
                 // we compare by ignoring nullable, because partition keys and 
primary keys might be
                 // nullable in source database, but they can't be null in 
Paimon
                 if (oldField.type().equalsIgnoreNullable(newField.type())) {
@@ -208,23 +214,23 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                             && 
!newField.description().equals(oldField.description())) {
                         result.add(
                                 SchemaChange.updateColumnComment(
-                                        new String[] {newField.name()}, 
newField.description()));
+                                        new String[] {newFieldName}, 
newField.description()));
                     }
                 } else {
                     // update column type
-                    result.add(SchemaChange.updateColumnType(newField.name(), 
newField.type()));
+                    result.add(SchemaChange.updateColumnType(newFieldName, 
newField.type()));
                     // update column comment
                     if (newField.description() != null) {
                         result.add(
                                 SchemaChange.updateColumnComment(
-                                        new String[] {newField.name()}, 
newField.description()));
+                                        new String[] {newFieldName}, 
newField.description()));
                     }
                 }
             } else {
                 // add column
                 result.add(
                         SchemaChange.addColumn(
-                                newField.name(), newField.type(), 
newField.description(), null));
+                                newFieldName, newField.type(), 
newField.description(), null));
             }
         }
         return result;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 9ed85c71d..01cb78dae 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -542,11 +542,17 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         RowType rowType =
                 RowType.of(
                         new DataType[] {
-                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.INT()
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.INT(),
+                            DataTypes.VARCHAR(10)
                         },
-                        new String[] {"k1", "v0", "v1"});
+                        new String[] {"k1", "v0", "v1", "v2"});
         waitForResult(
-                Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"),
+                Arrays.asList(
+                        "+I[5, five, 50, NULL]",
+                        "+I[7, seven, 70, NULL]",
+                        "+I[8, eight, 80, added]"),
                 table,
                 rowType,
                 Collections.singletonList("k1"));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 5d458ae60..b1dda4736 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -27,7 +27,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.JobClient;
@@ -468,9 +467,9 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
 
     @Test
     @Timeout(60)
-    public void testIgnoreCase() throws Exception {
+    public void testIgnoreCaseDivided() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", "paimon_ignore_CASE");
+        mySqlConfig.put("database-name", "paimon_ignore_CASE_divided");
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
@@ -481,55 +480,83 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
-        // check table schema
-        FileStoreTable table = getFileStoreTable("t");
-        assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields()))
-                .isEqualTo(
-                        "[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT 
NULL\",\"description\":\"\"},"
-                                + 
"{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
-
-        // check sync schema changes and records
         try (Statement statement = getStatement()) {
-            statement.executeUpdate("USE paimon_ignore_CASE");
-            statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi')");
-            RowType rowType1 =
-                    RowType.of(
-                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(20)},
-                            new String[] {"k", "uppercase_v0"});
-            waitForResult(
-                    Collections.singletonList("+I[1, Hi]"),
-                    table,
-                    rowType1,
-                    Collections.singletonList("k"));
+            statement.executeUpdate("USE paimon_ignore_CASE_divided");
+            ignoreCaseTableCheck(statement, "T");
+        }
+    }
 
-            statement.executeUpdate("ALTER TABLE T MODIFY COLUMN UPPERCASE_V0 
VARCHAR(30)");
-            statement.executeUpdate("INSERT INTO T VALUES (2, 'Paimon')");
-            RowType rowType2 =
-                    RowType.of(
-                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(30)},
-                            new String[] {"k", "uppercase_v0"});
-            waitForResult(
-                    Arrays.asList("+I[1, Hi]", "+I[2, Paimon]"),
-                    table,
-                    rowType2,
-                    Collections.singletonList("k"));
+    @Test
+    @Timeout(60)
+    public void testIgnoreCaseCombined() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "paimon_ignore_CASE_combined");
 
-            statement.executeUpdate("ALTER TABLE T ADD COLUMN UPPERCASE_V1 
DOUBLE");
-            statement.executeUpdate("INSERT INTO T VALUES (3, 'Test', 0.5)");
-            RowType rowType3 =
-                    RowType.of(
-                            new DataType[] {
-                                DataTypes.INT().notNull(), 
DataTypes.VARCHAR(30), DataTypes.DOUBLE()
-                            },
-                            new String[] {"k", "uppercase_v0", 
"uppercase_v1"});
-            waitForResult(
-                    Arrays.asList("+I[1, Hi, NULL]", "+I[2, Paimon, NULL]", 
"+I[3, Test, 0.5]"),
-                    table,
-                    rowType3,
-                    Collections.singletonList("k"));
+        MySqlSyncDatabaseAction action =
+                syncDatabaseActionBuilder(mySqlConfig)
+                        .withCatalogConfig(
+                                Collections.singletonMap(
+                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                        .withMode(COMBINED.configString())
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement()) {
+            statement.executeUpdate("USE paimon_ignore_CASE_combined");
+            ignoreCaseTableCheck(statement, "T");
+
+            // check new table
+            statement.executeUpdate(
+                    "CREATE TABLE T1 (k INT, UPPERCASE_V0 VARCHAR(20), PRIMARY 
KEY (k))");
+            waitingTables("t1");
+            ignoreCaseTableCheck(statement, "T1");
         }
     }
 
+    private void ignoreCaseTableCheck(Statement statement, String tableName) 
throws Exception {
+        FileStoreTable table = getFileStoreTable(tableName.toLowerCase());
+
+        // check sync schema changes and records
+        statement.executeUpdate("INSERT INTO " + tableName + " VALUES (1, 
'Hi')");
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(20)},
+                        new String[] {"k", "uppercase_v0"});
+        waitForResult(
+                Collections.singletonList("+I[1, Hi]"),
+                table,
+                rowType1,
+                Collections.singletonList("k"));
+
+        statement.executeUpdate(
+                "ALTER TABLE " + tableName + " MODIFY COLUMN UPPERCASE_V0 
VARCHAR(30)");
+        statement.executeUpdate("INSERT INTO " + tableName + " VALUES (2, 
'Paimon')");
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(30)},
+                        new String[] {"k", "uppercase_v0"});
+        waitForResult(
+                Arrays.asList("+I[1, Hi]", "+I[2, Paimon]"),
+                table,
+                rowType2,
+                Collections.singletonList("k"));
+
+        statement.executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN 
UPPERCASE_V1 DOUBLE");
+        statement.executeUpdate("INSERT INTO " + tableName + " VALUES (3, 
'Test', 0.5)");
+        RowType rowType3 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(30), 
DataTypes.DOUBLE()
+                        },
+                        new String[] {"k", "uppercase_v0", "uppercase_v1"});
+        waitForResult(
+                Arrays.asList("+I[1, Hi, NULL]", "+I[2, Paimon, NULL]", "+I[3, 
Test, 0.5]"),
+                table,
+                rowType3,
+                Collections.singletonList("k"));
+    }
+
     @Test
     @Timeout(600)
     public void testNewlyAddedTables() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
index ebf431813..998ba5141 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
@@ -18,3 +18,4 @@
 
 
{"data":[{"k1":"5","v0":"five","V1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"}
 
{"data":[{"K1":"7","v0":"seven","V1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"K1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"}
+{"data":[{"K1":"8","v0":"eight","V1":"80", 
"V2":"added"}],"database":"paimon_sync_database_affix","es":1684770074000,"id":84,"isDdl":false,"mysqlType":{"K1":"INT","v0":"VARCHAR(10)","V1":"INT","V2":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4,"v2":12},"table":"t1","ts":1684770073254,"type":"INSERT"}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
index bd70146be..b882e6d0a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
@@ -191,11 +191,24 @@ CREATE TABLE test (
 );
 
 -- 
################################################################################
---  MySqlSyncDatabaseActionITCase#testIgnoreCase
+--  MySqlSyncDatabaseActionITCase#testIgnoreCaseDivided
 -- 
################################################################################
 
-CREATE DATABASE paimon_ignore_CASE;
-USE paimon_ignore_CASE;
+CREATE DATABASE paimon_ignore_CASE_divided;
+USE paimon_ignore_CASE_divided;
+
+CREATE TABLE T (
+    k INT,
+    UPPERCASE_V0 VARCHAR(20),
+    PRIMARY KEY (k)
+);
+
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase#testIgnoreCaseCombined
+-- 
################################################################################
+
+CREATE DATABASE paimon_ignore_CASE_combined;
+USE paimon_ignore_CASE_combined;
 
 CREATE TABLE T (
     k INT,

Reply via email to