This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8d30367 add jdbcProperties options (#146)
8d30367 is described below
commit 8d30367fa9a9bfdf26ec4a2374405ab8e169c9f2
Author: wudi <[email protected]>
AuthorDate: Fri Jun 9 13:53:28 2023 +0800
add jdbcProperties options (#146)
* add use SSL
Co-authored-by: wudi <>
---
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 27 ++++++++++++++++------
1 file changed, 20 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index caa975d..629e6e1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -50,18 +50,19 @@ import java.util.Properties;
public class MysqlDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(MysqlDatabaseSync.class);
+ private static String JDBC_URL =
"jdbc:mysql://%s:%d?useInformationSchema=true";
+
public MysqlDatabaseSync() {
}
@Override
public Connection getConnection() throws SQLException {
- return DriverManager.getConnection(
- String.format(
- "jdbc:mysql://%s:%d?useInformationSchema=true",
- config.get(MySqlSourceOptions.HOSTNAME),
- config.get(MySqlSourceOptions.PORT)),
- config.get(MySqlSourceOptions.USERNAME),
- config.get(MySqlSourceOptions.PASSWORD));
+ Properties jdbcProperties = getJdbcProperties();
+ StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL);
+ jdbcProperties.forEach((key, value) ->
jdbcUrlSb.append("&").append(key).append("=").append(value));
+ String jdbcUrl = String.format(jdbcUrlSb.toString(),
config.get(MySqlSourceOptions.HOSTNAME), config.get(MySqlSourceOptions.PORT));
+
+ return
DriverManager.getConnection(jdbcUrl,config.get(MySqlSourceOptions.USERNAME),config.get(MySqlSourceOptions.PASSWORD));
}
@Override
@@ -189,4 +190,16 @@ public class MysqlDatabaseSync extends DatabaseSync {
mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
return streamSource;
}
+
+ private Properties getJdbcProperties(){
+ Properties jdbcProps = new Properties();
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
+
jdbcProps.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+ }
+ }
+ return jdbcProps;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]