This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c211c75cf [flink][cdc] Register MySQL JDBC driver when constructing
cdc action (#2255)
c211c75cf is described below
commit c211c75cf94918ebb6deebdba781445ac8aaa2fd
Author: yuzelin <[email protected]>
AuthorDate: Mon Nov 6 12:48:30 2023 +0800
[flink][cdc] Register MySQL JDBC driver when constructing cdc action (#2255)
---
.../flink/action/cdc/mysql/MySqlActionUtils.java | 19 +++++++++++++++++++
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 2 ++
.../flink/action/cdc/mysql/MySqlSyncTableAction.java | 2 ++
3 files changed, 23 insertions(+)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 2543f4bfb..b5f5047b2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -37,6 +37,8 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -56,6 +58,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Utils for MySQL Action. */
public class MySqlActionUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlActionUtils.class);
+
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
@@ -223,6 +227,21 @@ public class MySqlActionUtils {
.build();
}
+ public static void registerJdbcDriver() {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (ClassNotFoundException ex) {
+ LOG.warn(
+ "Cannot find class com.mysql.cj.jdbc.Driver. Try to load
class com.mysql.jdbc.Driver.");
+ try {
+ Class.forName("com.mysql.jdbc.Driver");
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "No suitable driver found. Cannot find class
com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver.");
+ }
+ }
+ }
+
private static void validateMySqlConfig(Configuration mySqlConfig) {
checkArgument(
mySqlConfig.get(MySqlSourceOptions.HOSTNAME) != null,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 8f1f5c830..3369626ed 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -128,6 +128,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
super(warehouse, catalogConfig);
this.database = database;
this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+
+ MySqlActionUtils.registerJdbcDriver();
}
public MySqlSyncDatabaseAction withTableConfig(Map<String, String>
tableConfig) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index bc6c99932..da349f0a0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -111,6 +111,8 @@ public class MySqlSyncTableAction extends ActionBase {
this.database = database;
this.table = table;
this.mySqlConfig = Configuration.fromMap(mySqlConfig);
+
+ MySqlActionUtils.registerJdbcDriver();
}
public MySqlSyncTableAction withPartitionKeys(String... partitionKeys) {