This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 86740e537e6028e9ee2be78a510ee8404763e5d5 Author: yuzelin <[email protected]> AuthorDate: Mon Apr 24 18:35:08 2023 +0800 [flink][bug] Correct the use of MySQL JDBC metadata in MySQL CDC actions (#1000) --- .../src/test/resources/mysql/setup.sql | 27 ++++++++++++++++++++++ .../cdc/mysql/MySqlDebeziumJsonEventParser.java | 11 +++++---- .../paimon/flink/action/cdc/mysql/MySqlSchema.java | 23 ++---------------- .../src/test/resources/mysql/setup.sql | 23 ++++++++++++++++++ 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/paimon-e2e-tests/src/test/resources/mysql/setup.sql b/paimon-e2e-tests/src/test/resources/mysql/setup.sql index 10f111e21..ad1fccb54 100644 --- a/paimon-e2e-tests/src/test/resources/mysql/setup.sql +++ b/paimon-e2e-tests/src/test/resources/mysql/setup.sql @@ -61,6 +61,23 @@ CREATE TABLE t2 ( PRIMARY KEY (k1, k2) ); +-- to make sure we use JDBC Driver correctly +CREATE DATABASE paimon_sync_database1; +USE paimon_sync_database1; + +CREATE TABLE t1 ( + k INT, + v INT, + PRIMARY KEY (k) +); + +CREATE TABLE t2 ( + k1 INT, + k2 VARCHAR(10), + v1 INT, + PRIMARY KEY (k1, k2) +); + -- ################################################################################ -- MySqlIgnoreCaseE2EeTest#testSyncDatabase -- ################################################################################ @@ -73,3 +90,13 @@ CREATE TABLE T ( UPPERCASE_V0 VARCHAR(20), PRIMARY KEY (k) ); + +-- to make sure we use JDBC Driver correctly +CREATE DATABASE paimon_ignore_CASE1; +USE paimon_ignore_CASE1; + +CREATE TABLE T ( + k INT, + UPPERCASE_V0 VARCHAR(20), + PRIMARY KEY (k) +); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index 4de4e2c02..177258d95 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -46,6 +46,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** * {@link EventParser} for MySQL Debezium JSON. * @@ -288,11 +290,10 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> { Map<String, String> keyCaseInsensitive = new HashMap<>(); for (Map.Entry<String, String> entry : origin.entrySet()) { String fieldName = entry.getKey().toLowerCase(); - if (keyCaseInsensitive.containsKey(fieldName)) { - LOG.warn( - "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n{}", - origin); - } + checkArgument( + !keyCaseInsensitive.containsKey(fieldName), + "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n%s", + origin); keyCaseInsensitive.put(fieldName, entry.getValue()); } return keyCaseInsensitive; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java index c8ad3d358..74f4746e5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java @@ -21,25 +21,18 @@ package org.apache.paimon.flink.action.cdc.mysql; import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.types.DataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.util.ArrayList; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Utility class to load MySQL table schema with JDBC. */ public class MySqlSchema { - private static final Logger LOG = LoggerFactory.getLogger(MySqlSchema.class); - // used for retrieving metadata and throwing error, do not convert to case-insensitive form private final String databaseName; private final String originalTableName; @@ -56,25 +49,13 @@ public class MySqlSchema { this.originalTableName = tableName; this.tableName = caseSensitive ? tableName : tableName.toLowerCase(); - Set<String> originalFields = new HashSet<>(); fields = new LinkedHashMap<>(); - try (ResultSet rs = metaData.getColumns(null, databaseName, tableName, null)) { + try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null)) { while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); String fieldType = rs.getString("TYPE_NAME"); Integer precision = rs.getInt("COLUMN_SIZE"); - // in some cases the #getColumns will return primary keys twice (unknown issue) - if (originalFields.contains(fieldName)) { - LOG.warn( - "Duplicate field found: '{}'.\nDebug information: MySQL version is {}; JDBC Driver version is {}", - fieldName, - metaData.getDatabaseProductVersion(), - metaData.getDriverVersion()); - continue; - } - originalFields.add(fieldName); - if (rs.wasNull()) { precision = null; } @@ -95,7 +76,7 @@ public class MySqlSchema { } primaryKeys = new ArrayList<>(); - try (ResultSet rs = metaData.getPrimaryKeys(null, databaseName, tableName)) { + try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) { while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); if (!caseSensitive) { diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql index fe3ac0ef3..0b17f85cc 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql +++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql @@ -259,3 +259,26 @@ CREATE TABLE t2 ( CREATE TABLE t3 ( v1 INT ); + +-- to make sure we use JDBC Driver correctly +CREATE DATABASE paimon_sync_database1; +USE paimon_sync_database1; + +CREATE TABLE t1 ( + k INT, + v1 VARCHAR(10), + PRIMARY KEY (k) +); + +CREATE TABLE t2 ( + k1 INT, + k2 VARCHAR(10), + v1 INT, + v2 BIGINT, + PRIMARY KEY (k1, k2) +); + +-- no primary key, should be ignored +CREATE TABLE t3 ( + v1 INT +);
