This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ae1dde16b [cdc] Optimize Duplicate Field Detection in MySQL CDC Schema 
Building. (#2078)
ae1dde16b is described below

commit ae1dde16b87f8d28c9be72d0b31e760850dc900e
Author: Kerwin <[email protected]>
AuthorDate: Thu Sep 28 12:47:11 2023 +0800

    [cdc] Optimize Duplicate Field Detection in MySQL CDC Schema Building. 
(#2078)
---
 .../flink/action/cdc/CdcActionCommonUtils.java     |  5 +++--
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  | 22 +++++++++++-----------
 2 files changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 19e3283fc..1561d9ede 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -140,8 +140,9 @@ public class CdcActionCommonUtils {
         if (caseSensitive) {
             return column;
         }
-        checkArgument(existedFields.add(column.toLowerCase()), 
columnDuplicateErrMsg.apply(column));
-        return column.toLowerCase();
+        String columnLowerCase = column.toLowerCase();
+        checkArgument(existedFields.add(columnLowerCase), 
columnDuplicateErrMsg.apply(column));
+        return columnLowerCase;
     }
 
     public static Schema buildPaimonSchema(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 24edbc66f..25ef2fe37 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -27,15 +27,17 @@ import io.debezium.relational.Column;
 import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
 
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
 public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<TableChanges.TableChange> {
@@ -57,8 +59,10 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<TableChang
         String tableName = tableChange.getId().toString();
         List<Column> columns = table.columns();
 
+        Set<String> existedFields = new HashSet<>();
+        Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(tableName);
+
         Schema.Builder builder = Schema.newBuilder();
-        Map<String, Integer> duplicateFields = new HashMap<>();
 
         // column
         for (Column column : columns) {
@@ -71,16 +75,12 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<TableChang
 
             dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || 
column.isOptional());
 
-            String columnName = column.name();
-            if (!caseSensitive) {
-                checkArgument(
-                        !duplicateFields.containsKey(columnName.toLowerCase()),
-                        columnDuplicateErrMsg(tableName).apply(columnName));
-                columnName = columnName.toLowerCase();
-            }
+            String columnName =
+                    columnCaseConvertAndDuplicateCheck(
+                            column.name(), existedFields, caseSensitive, 
columnDuplicateErrMsg);
+
             // TODO : add table comment and column comment when we upgrade 
flink cdc to 2.4
             builder.column(columnName, dataType, null);
-            duplicateFields.put(columnName, 1);
         }
 
         // primaryKey

Reply via email to