This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ae1dde16b [cdc] Optimize Duplicate Field Detection in MySQL CDC Schema
Building. (#2078)
ae1dde16b is described below
commit ae1dde16b87f8d28c9be72d0b31e760850dc900e
Author: Kerwin <[email protected]>
AuthorDate: Thu Sep 28 12:47:11 2023 +0800
[cdc] Optimize Duplicate Field Detection in MySQL CDC Schema Building.
(#2078)
---
.../flink/action/cdc/CdcActionCommonUtils.java | 5 +++--
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 22 +++++++++++-----------
2 files changed, 14 insertions(+), 13 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 19e3283fc..1561d9ede 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -140,8 +140,9 @@ public class CdcActionCommonUtils {
if (caseSensitive) {
return column;
}
- checkArgument(existedFields.add(column.toLowerCase()),
columnDuplicateErrMsg.apply(column));
- return column.toLowerCase();
+ String columnLowerCase = column.toLowerCase();
+ checkArgument(existedFields.add(columnLowerCase),
columnDuplicateErrMsg.apply(column));
+ return columnLowerCase;
}
public static Schema buildPaimonSchema(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 24edbc66f..25ef2fe37 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -27,15 +27,17 @@ import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<TableChanges.TableChange> {
@@ -57,8 +59,10 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<TableChang
String tableName = tableChange.getId().toString();
List<Column> columns = table.columns();
+ Set<String> existedFields = new HashSet<>();
+ Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(tableName);
+
Schema.Builder builder = Schema.newBuilder();
- Map<String, Integer> duplicateFields = new HashMap<>();
// column
for (Column column : columns) {
@@ -71,16 +75,12 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<TableChang
dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) ||
column.isOptional());
- String columnName = column.name();
- if (!caseSensitive) {
- checkArgument(
- !duplicateFields.containsKey(columnName.toLowerCase()),
- columnDuplicateErrMsg(tableName).apply(columnName));
- columnName = columnName.toLowerCase();
- }
+ String columnName =
+ columnCaseConvertAndDuplicateCheck(
+ column.name(), existedFields, caseSensitive,
columnDuplicateErrMsg);
+
// TODO : add table comment and column comment when we upgrade
flink cdc to 2.4
builder.column(columnName, dataType, null);
- duplicateFields.put(columnName, 1);
}
// primaryKey