This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 86740e537e6028e9ee2be78a510ee8404763e5d5
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 24 18:35:08 2023 +0800

    [flink][bug] Correct the use of MySQL JDBC metadata in MySQL CDC actions 
(#1000)
---
 .../src/test/resources/mysql/setup.sql             | 27 ++++++++++++++++++++++
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 11 +++++----
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java | 23 ++----------------
 .../src/test/resources/mysql/setup.sql             | 23 ++++++++++++++++++
 4 files changed, 58 insertions(+), 26 deletions(-)

diff --git a/paimon-e2e-tests/src/test/resources/mysql/setup.sql 
b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
index 10f111e21..ad1fccb54 100644
--- a/paimon-e2e-tests/src/test/resources/mysql/setup.sql
+++ b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
@@ -61,6 +61,23 @@ CREATE TABLE t2 (
     PRIMARY KEY (k1, k2)
 );
 
+-- to make sure we use JDBC Driver correctly
+CREATE DATABASE paimon_sync_database1;
+USE paimon_sync_database1;
+
+CREATE TABLE t1 (
+    k INT,
+    v INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    PRIMARY KEY (k1, k2)
+);
+
 -- 
################################################################################
 --  MySqlIgnoreCaseE2EeTest#testSyncDatabase
 -- 
################################################################################
@@ -73,3 +90,13 @@ CREATE TABLE T (
     UPPERCASE_V0 VARCHAR(20),
     PRIMARY KEY (k)
 );
+
+-- to make sure we use JDBC Driver correctly
+CREATE DATABASE paimon_ignore_CASE1;
+USE paimon_ignore_CASE1;
+
+CREATE TABLE T (
+    k INT,
+    UPPERCASE_V0 VARCHAR(20),
+    PRIMARY KEY (k)
+);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 4de4e2c02..177258d95 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -46,6 +46,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /**
  * {@link EventParser} for MySQL Debezium JSON.
  *
@@ -288,11 +290,10 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         Map<String, String> keyCaseInsensitive = new HashMap<>();
         for (Map.Entry<String, String> entry : origin.entrySet()) {
             String fieldName = entry.getKey().toLowerCase();
-            if (keyCaseInsensitive.containsKey(fieldName)) {
-                LOG.warn(
-                        "Duplicate key appears when converting map keys to 
case-insensitive form. Original map is:\n{}",
-                        origin);
-            }
+            checkArgument(
+                    !keyCaseInsensitive.containsKey(fieldName),
+                    "Duplicate key appears when converting map keys to 
case-insensitive form. Original map is:\n%s",
+                    origin);
             keyCaseInsensitive.put(fieldName, entry.getValue());
         }
         return keyCaseInsensitive;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index c8ad3d358..74f4746e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -21,25 +21,18 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.types.DataType;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Utility class to load MySQL table schema with JDBC. */
 public class MySqlSchema {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSchema.class);
-
     // used for retrieving metadata and throwing error, do not convert to 
case-insensitive form
     private final String databaseName;
     private final String originalTableName;
@@ -56,25 +49,13 @@ public class MySqlSchema {
         this.originalTableName = tableName;
         this.tableName = caseSensitive ? tableName : tableName.toLowerCase();
 
-        Set<String> originalFields = new HashSet<>();
         fields = new LinkedHashMap<>();
-        try (ResultSet rs = metaData.getColumns(null, databaseName, tableName, 
null)) {
+        try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, 
null)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 String fieldType = rs.getString("TYPE_NAME");
                 Integer precision = rs.getInt("COLUMN_SIZE");
 
-                // in some cases the #getColumns will return primary keys 
twice (unknown issue)
-                if (originalFields.contains(fieldName)) {
-                    LOG.warn(
-                            "Duplicate field found: '{}'.\nDebug information: 
MySQL version is {}; JDBC Driver version is {}",
-                            fieldName,
-                            metaData.getDatabaseProductVersion(),
-                            metaData.getDriverVersion());
-                    continue;
-                }
-                originalFields.add(fieldName);
-
                 if (rs.wasNull()) {
                     precision = null;
                 }
@@ -95,7 +76,7 @@ public class MySqlSchema {
         }
 
         primaryKeys = new ArrayList<>();
-        try (ResultSet rs = metaData.getPrimaryKeys(null, databaseName, 
tableName)) {
+        try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, 
tableName)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 if (!caseSensitive) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index fe3ac0ef3..0b17f85cc 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -259,3 +259,26 @@ CREATE TABLE t2 (
 CREATE TABLE t3 (
     v1 INT
 );
+
+-- to make sure we use JDBC Driver correctly
+CREATE DATABASE paimon_sync_database1;
+USE paimon_sync_database1;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+-- no primary key, should be ignored
+CREATE TABLE t3 (
+    v1 INT
+);

Reply via email to