This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c211c75cf [flink][cdc] Register MySQL JDBC driver when constructing 
cdc action (#2255)
c211c75cf is described below

commit c211c75cf94918ebb6deebdba781445ac8aaa2fd
Author: yuzelin <[email protected]>
AuthorDate: Mon Nov 6 12:48:30 2023 +0800

    [flink][cdc] Register MySQL JDBC driver when constructing cdc action (#2255)
---
 .../flink/action/cdc/mysql/MySqlActionUtils.java      | 19 +++++++++++++++++++
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java     |  2 ++
 .../flink/action/cdc/mysql/MySqlSyncTableAction.java  |  2 ++
 3 files changed, 23 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 2543f4bfb..b5f5047b2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -37,6 +37,8 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -56,6 +58,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Utils for MySQL Action. */
 public class MySqlActionUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlActionUtils.class);
+
     public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
             ConfigOptions.key("scan.newly-added-table.enabled")
                     .booleanType()
@@ -223,6 +227,21 @@ public class MySqlActionUtils {
                 .build();
     }
 
+    public static void registerJdbcDriver() {
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (ClassNotFoundException ex) {
+            LOG.warn(
+                    "Cannot find class com.mysql.cj.jdbc.Driver. Try to load 
class com.mysql.jdbc.Driver.");
+            try {
+                Class.forName("com.mysql.jdbc.Driver");
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "No suitable driver found. Cannot find class 
com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver.");
+            }
+        }
+    }
+
     private static void validateMySqlConfig(Configuration mySqlConfig) {
         checkArgument(
                 mySqlConfig.get(MySqlSourceOptions.HOSTNAME) != null,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 8f1f5c830..3369626ed 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -128,6 +128,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         super(warehouse, catalogConfig);
         this.database = database;
         this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+
+        MySqlActionUtils.registerJdbcDriver();
     }
 
     public MySqlSyncDatabaseAction withTableConfig(Map<String, String> 
tableConfig) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index bc6c99932..da349f0a0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -111,6 +111,8 @@ public class MySqlSyncTableAction extends ActionBase {
         this.database = database;
         this.table = table;
         this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+
+        MySqlActionUtils.registerJdbcDriver();
     }
 
     public MySqlSyncTableAction withPartitionKeys(String... partitionKeys) {

Reply via email to