This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 59ab82c2417748af43edaad7868794aa29d3571e Author: yuzelin <[email protected]> AuthorDate: Sun Apr 23 16:17:14 2023 +0800 [hotfix] Refactor column field name duplication check for MySqlSchema constructor (#977) --- docs/content/how-to/cdc-ingestion.md | 2 ++ .../cdc/mysql/MySqlDebeziumJsonEventParser.java | 11 ++++----- .../paimon/flink/action/cdc/mysql/MySqlSchema.java | 28 +++++++++++++++++++--- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index 19315d896..afa198652 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -123,6 +123,8 @@ To use this feature through `flink run`, run the following shell command. * `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations. * `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. All Paimon sink table will be applied the same set of configurations. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations. +Only tables with primary keys will be synchronized. + For each MySQL table to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table. Its schema will be derived from all specified MySQL tables. If the Paimon table already exists, its schema will be compared against the schema of all specified MySQL tables. This action supports a limited number of schema changes. Unsupported schema changes will be ignored. Currently supported schema changes includes: diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index 0be722ed1..4de4e2c02 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -46,8 +46,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** * {@link EventParser} for MySQL Debezium JSON. * @@ -290,10 +288,11 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> { Map<String, String> keyCaseInsensitive = new HashMap<>(); for (Map.Entry<String, String> entry : origin.entrySet()) { String fieldName = entry.getKey().toLowerCase(); - checkArgument( - !keyCaseInsensitive.containsKey(fieldName), - "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n." - + origin); + if (keyCaseInsensitive.containsKey(fieldName)) { + LOG.warn( + "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n{}", + origin); + } keyCaseInsensitive.put(fieldName, entry.getValue()); } return keyCaseInsensitive; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java index 7533795d5..c8ad3d358 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java @@ -21,18 +21,25 @@ package org.apache.paimon.flink.action.cdc.mysql; import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.types.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Utility class to load MySQL table schema with JDBC. */ public class MySqlSchema { + private static final Logger LOG = LoggerFactory.getLogger(MySqlSchema.class); + // used for retrieving metadata and throwing error, do not convert to case-insensitive form private final String databaseName; private final String originalTableName; @@ -49,12 +56,25 @@ public class MySqlSchema { this.originalTableName = tableName; this.tableName = caseSensitive ? tableName : tableName.toLowerCase(); + Set<String> originalFields = new HashSet<>(); fields = new LinkedHashMap<>(); try (ResultSet rs = metaData.getColumns(null, databaseName, tableName, null)) { while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); String fieldType = rs.getString("TYPE_NAME"); Integer precision = rs.getInt("COLUMN_SIZE"); + + // in some cases the #getColumns will return primary keys twice (unknown issue) + if (originalFields.contains(fieldName)) { + LOG.warn( + "Duplicate field found: '{}'.\nDebug information: MySQL version is {}; JDBC Driver version is {}", + fieldName, + metaData.getDatabaseProductVersion(), + metaData.getDriverVersion()); + continue; + } + originalFields.add(fieldName); + if (rs.wasNull()) { precision = null; } @@ -63,10 +83,12 @@ public class MySqlSchema { scale = null; } if (!caseSensitive) { - fieldName = fieldName.toLowerCase(); checkArgument( - !fields.containsKey(fieldName), - "Duplicate key appears when converting fields map keys to case-insensitive form."); + !fields.containsKey(fieldName.toLowerCase()), + String.format( + "Duplicate key '%s' in table '%s.%s' appears when converting fields map keys to case-insensitive form.", + fieldName, databaseName, originalTableName)); + fieldName = fieldName.toLowerCase(); } fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType, precision, scale)); }
