This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1869a7d0 [improve](cdc) add jdbc properties configuration (#450)
1869a7d0 is described below

commit 1869a7d02c02a08399bd2c0d27e75a2e9b176793
Author: Petrichor <[email protected]>
AuthorDate: Mon Aug 12 15:12:59 2024 +0800

    [improve](cdc) add jdbc properties configuration (#450)
    
    In the previous version, we were unable to configure the JDBC URL 
properties for Postgres, SqlServer, and Db2, which might cause some connection 
issues in certain scenarios.
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 21 +++++
 .../doris/flink/tools/cdc/db2/Db2DatabaseSync.java | 21 ++++-
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   | 21 +----
 .../tools/cdc/postgres/PostgresDatabaseSync.java   | 27 +++++-
 .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 14 +++-
 .../flink/tools/cdc/CdcDb2SyncDatabaseCase.java    |  4 +
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |  3 +
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |  2 +
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |  3 +
 .../doris/flink/tools/cdc/DatabaseSyncTest.java    | 97 ++++++++++++++++++++++
 10 files changed, 189 insertions(+), 24 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e565d0a7..2f672adb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -59,6 +59,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;
+
 public abstract class DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(DatabaseSync.class);
     private static final String TABLE_NAME_OPTIONS = "table-name";
@@ -483,6 +485,25 @@ public abstract class DatabaseSync {
         }
     }
 
+    protected Properties getJdbcProperties() {
+        Properties jdbcProps = new Properties();
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(PROPERTIES_PREFIX)) {
+                jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), 
value);
+            }
+        }
+        return jdbcProps;
+    }
+
+    protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties 
jdbcProperties) {
+        StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+        jdbcProperties.forEach(
+                (key, value) -> 
jdbcUrlBuilder.append("&").append(key).append("=").append(value));
+        return jdbcUrlBuilder.toString();
+    }
+
     public DatabaseSync setEnv(StreamExecutionEnvironment env) {
         this.env = env;
         return this;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
index 2dcd21a9..3947c1e1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
@@ -89,9 +89,11 @@ public class Db2DatabaseSync extends DatabaseSync {
 
     @Override
     public Connection getConnection() throws SQLException {
+        Properties jdbcProperties = getJdbcProperties();
+        String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
         String jdbcUrl =
                 String.format(
-                        JDBC_URL,
+                        jdbcUrlTemplate,
                         config.get(JdbcSourceOptions.HOSTNAME),
                         config.get(PORT),
                         config.get(JdbcSourceOptions.DATABASE_NAME));
@@ -224,4 +226,21 @@ public class Db2DatabaseSync extends DatabaseSync {
     public String getTableListPrefix() {
         return config.get(JdbcSourceOptions.SCHEMA_NAME);
     }
+
+    @Override
+    protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties 
jdbcProperties) {
+        StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+        boolean firstParam = true;
+        for (Map.Entry<Object, Object> entry : jdbcProperties.entrySet()) {
+            Object key = entry.getKey();
+            Object value = entry.getValue();
+            if (firstParam) {
+                
jdbcUrlBuilder.append(":").append(key).append("=").append(value).append(";");
+                firstParam = false;
+            } else {
+                
jdbcUrlBuilder.append(key).append("=").append(value).append(";");
+            }
+        }
+        return jdbcUrlBuilder.toString();
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index a58008cd..9fdfdcae 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -55,10 +55,11 @@ import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;
+
 public class MysqlDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(MysqlDatabaseSync.class);
     private static final String JDBC_URL = 
"jdbc:mysql://%s:%d?useInformationSchema=true";
-    private static final String PROPERTIES_PREFIX = "jdbc.properties.";
 
     public MysqlDatabaseSync() throws SQLException {
         super();
@@ -83,12 +84,10 @@ public class MysqlDatabaseSync extends DatabaseSync {
     @Override
     public Connection getConnection() throws SQLException {
         Properties jdbcProperties = getJdbcProperties();
-        StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL);
-        jdbcProperties.forEach(
-                (key, value) -> 
jdbcUrlSb.append("&").append(key).append("=").append(value));
+        String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
         String jdbcUrl =
                 String.format(
-                        jdbcUrlSb.toString(),
+                        jdbcUrlTemplate,
                         config.get(MySqlSourceOptions.HOSTNAME),
                         config.get(MySqlSourceOptions.PORT));
 
@@ -269,16 +268,4 @@ public class MysqlDatabaseSync extends DatabaseSync {
         }
         return chunkMap;
     }
-
-    private Properties getJdbcProperties() {
-        Properties jdbcProps = new Properties();
-        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
-            String key = entry.getKey();
-            String value = entry.getValue();
-            if (key.startsWith(PROPERTIES_PREFIX)) {
-                jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), 
value);
-            }
-        }
-        return jdbcProps;
-    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 66e26d15..15fc632b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -66,7 +66,7 @@ import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSou
 public class PostgresDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(PostgresDatabaseSync.class);
 
-    private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
+    private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s?";
 
     public PostgresDatabaseSync() throws SQLException {
         super();
@@ -84,9 +84,11 @@ public class PostgresDatabaseSync extends DatabaseSync {
 
     @Override
     public Connection getConnection() throws SQLException {
+        Properties jdbcProperties = getJdbcProperties();
+        String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
         String jdbcUrl =
                 String.format(
-                        JDBC_URL,
+                        jdbcUrlTemplate,
                         config.get(PostgresSourceOptions.HOSTNAME),
                         config.get(PostgresSourceOptions.PG_PORT),
                         config.get(PostgresSourceOptions.DATABASE_NAME));
@@ -227,7 +229,24 @@ public class PostgresDatabaseSync extends DatabaseSync {
 
     @Override
     public String getTableListPrefix() {
-        String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
-        return schemaName;
+        return config.get(PostgresSourceOptions.SCHEMA_NAME);
+    }
+
+    @Override
+    protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties 
jdbcProperties) {
+
+        if (!initialJdbcUrl.startsWith("?")) {
+            return super.getJdbcUrlTemplate(initialJdbcUrl, jdbcProperties);
+        }
+        StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+        int recordIndex = 0;
+        for (Map.Entry<Object, Object> entry : jdbcProperties.entrySet()) {
+            
jdbcUrlBuilder.append(entry.getKey()).append("=").append(entry.getValue());
+            if (recordIndex < jdbcProperties.size() - 1) {
+                jdbcUrlBuilder.append("&");
+                recordIndex++;
+            }
+        }
+        return jdbcUrlBuilder.toString();
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index 08c54dd3..cb6b6682 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -63,7 +63,7 @@ import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_K
 
 public class SqlServerDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(SqlServerDatabaseSync.class);
-    private static final String JDBC_URL = 
"jdbc:sqlserver://%s:%d;database=%s";
+    private static final String JDBC_URL = 
"jdbc:sqlserver://%s:%d;database=%s;";
     private static final String PORT = "port";
 
     public SqlServerDatabaseSync() throws SQLException {
@@ -82,9 +82,11 @@ public class SqlServerDatabaseSync extends DatabaseSync {
 
     @Override
     public Connection getConnection() throws SQLException {
+        Properties jdbcProperties = getJdbcProperties();
+        String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
         String jdbcUrl =
                 String.format(
-                        JDBC_URL,
+                        jdbcUrlTemplate,
                         config.get(JdbcSourceOptions.HOSTNAME),
                         config.getInteger(PORT, 1433),
                         config.get(JdbcSourceOptions.DATABASE_NAME));
@@ -216,4 +218,12 @@ public class SqlServerDatabaseSync extends DatabaseSync {
     public String getTableListPrefix() {
         return config.get(JdbcSourceOptions.SCHEMA_NAME);
     }
+
+    @Override
+    public String getJdbcUrlTemplate(String initialJdbcUrl, Properties 
jdbcProperties) {
+        StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+        jdbcProperties.forEach(
+                (key, value) -> 
jdbcUrlBuilder.append(key).append("=").append(value).append(";"));
+        return jdbcUrlBuilder.toString();
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
index 0327079a..0666cb9d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -49,6 +49,10 @@ public class CdcDb2SyncDatabaseCase {
         sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1");
         sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456");
         
sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true");
+        // add jdbc properties configuration
+        sourceConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1");
+        sourceConfig.put("jdbc.properties.resultSetHoldability", "1");
+        sourceConfig.put("jdbc.properties.SSL", "false");
 
         Configuration config = Configuration.fromMap(sourceConfig);
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index e0c0b828..c430ea87 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -53,6 +53,8 @@ public class CdcMysqlSyncDatabaseCase {
         mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306");
         mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "root");
         mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "12345678");
+        // add jdbc properties for MySQL
+        mysqlConfig.put("jdbc.properties.use_ssl", "false");
         Configuration config = Configuration.fromMap(mysqlConfig);
 
         Map<String, String> sinkConfig = new HashMap<>();
@@ -61,6 +63,7 @@ public class CdcMysqlSyncDatabaseCase {
         sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "");
         sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), 
"jdbc:mysql://10.20.30.1:9030");
         sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), 
UUID.randomUUID().toString());
+        sinkConfig.put("sink.enable-delete", "false");
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index f1e61e72..99892e02 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -49,6 +49,8 @@ public class CdcPostgresSyncDatabaseCase {
         sourceConfig.put(PostgresSourceOptions.PG_PORT.key(), "5432");
         sourceConfig.put(PostgresSourceOptions.USERNAME.key(), "postgres");
         sourceConfig.put(PostgresSourceOptions.PASSWORD.key(), "123456");
+        // add jdbc properties configuration
+        sourceConfig.put("jdbc.properties.ssl", "false");
         // 
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
         // sourceConfig.put("scan.incremental.snapshot.enabled","true");
         // sourceConfig.put("debezium.include.schema.changes","false");
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 4e343239..ca6a3121 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -47,6 +47,9 @@ public class CdcSqlServerSyncDatabaseCase {
         sourceConfig.put(DatabaseSyncConfig.PORT, "1433");
         sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "sa");
         sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "Passw@rd");
+        // add jdbc properties configuration
+        sourceConfig.put("jdbc.properties.encrypt", "false");
+        sourceConfig.put("jdbc.properties.integratedSecurity", "false");
         // 
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
         // sourceConfig.put("scan.incremental.snapshot.enabled","true");
         // sourceConfig.put("debezium.include.schema.changes","false");
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index f0cd0a51..859a8720 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -20,16 +20,22 @@ package org.apache.doris.flink.tools.cdc;
 import org.apache.flink.configuration.Configuration;
 
 import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
 import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -169,4 +175,95 @@ public class DatabaseSyncTest {
         assertFalse("ssb_test.dates".matches(syncTableListPattern));
         assertFalse("ssb_test.lineorder".matches(syncTableListPattern));
     }
+
+    @Test
+    public void getJdbcPropertiesTest() throws Exception {
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        Map<String, String> mysqlConfig = new HashMap<>();
+        mysqlConfig.put("jdbc.properties.use_ssl", "false");
+
+        Configuration config = Configuration.fromMap(mysqlConfig);
+        databaseSync.setConfig(config);
+        Properties jdbcProperties = databaseSync.getJdbcProperties();
+        Assert.assertEquals(1, jdbcProperties.size());
+        Assert.assertEquals("false", jdbcProperties.getProperty("use_ssl"));
+    }
+
+    @Test
+    public void getJdbcUrlTemplateTest() throws SQLException {
+        String mysqlJdbcTemplate = 
"jdbc:mysql://%s:%d?useInformationSchema=true";
+        String postgresJdbcTemplate = "jdbc:postgresql://%s:%d/%s?";
+        String sqlServerJdbcTemplate = "jdbc:sqlserver://%s:%d;database=%s;";
+        String db2JdbcTemplate = "jdbc:db2://%s:%d/%s";
+
+        // mysql jdbc properties configuration
+        DatabaseSync mysqlDatabaseSync = new MysqlDatabaseSync();
+        Map<String, String> mysqlJdbcConfig = new LinkedHashMap<>();
+        mysqlJdbcConfig.put("jdbc.properties.use_ssl", "false");
+
+        DatabaseSync postgresDatabaseSync = new PostgresDatabaseSync();
+        Map<String, String> postgresJdbcConfig = new LinkedHashMap<>();
+        postgresJdbcConfig.put("jdbc.properties.ssl", "false");
+
+        DatabaseSync sqlServerDatabaseSync = new SqlServerDatabaseSync();
+        Map<String, String> sqlServerJdbcConfig = new LinkedHashMap<>();
+        sqlServerJdbcConfig.put("jdbc.properties.encrypt", "false");
+        sqlServerJdbcConfig.put("jdbc.properties.integratedSecurity", "false");
+
+        DatabaseSync db2DatabaseSync = new Db2DatabaseSync();
+        Map<String, String> db2JdbcConfig = new LinkedHashMap<>();
+        db2JdbcConfig.put("jdbc.properties.ssl", "false");
+        db2JdbcConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", 
"1");
+        db2JdbcConfig.put("jdbc.properties.resultSetHoldability", "1");
+
+        Configuration mysqlConfig = Configuration.fromMap(mysqlJdbcConfig);
+        mysqlDatabaseSync.setConfig(mysqlConfig);
+
+        Configuration postgresConfig = 
Configuration.fromMap(postgresJdbcConfig);
+        postgresDatabaseSync.setConfig(postgresConfig);
+
+        Configuration sqlServerConfig = 
Configuration.fromMap(sqlServerJdbcConfig);
+        sqlServerDatabaseSync.setConfig(sqlServerConfig);
+
+        Configuration db2Config = Configuration.fromMap(db2JdbcConfig);
+        db2DatabaseSync.setConfig(db2Config);
+
+        Properties mysqlJdbcProperties = mysqlDatabaseSync.getJdbcProperties();
+        Assert.assertEquals(1, mysqlJdbcProperties.size());
+        Assert.assertEquals("false", 
mysqlJdbcProperties.getProperty("use_ssl"));
+        String mysqlJdbcUrlTemplate =
+                mysqlDatabaseSync.getJdbcUrlTemplate(mysqlJdbcTemplate, 
mysqlJdbcProperties);
+        Assert.assertEquals(mysqlJdbcTemplate + "&use_ssl=false", 
mysqlJdbcUrlTemplate);
+
+        Properties postgresJdbcProperties = 
postgresDatabaseSync.getJdbcProperties();
+        Assert.assertEquals(1, postgresJdbcProperties.size());
+        Assert.assertEquals("false", 
postgresJdbcProperties.getProperty("ssl"));
+        String postgresJdbcUrlTemplate =
+                postgresDatabaseSync.getJdbcUrlTemplate(
+                        postgresJdbcTemplate, postgresJdbcProperties);
+        Assert.assertEquals(postgresJdbcTemplate + "&ssl=false", 
postgresJdbcUrlTemplate);
+
+        Properties sqlServerJdbcProperties = 
sqlServerDatabaseSync.getJdbcProperties();
+        Assert.assertEquals(2, sqlServerJdbcProperties.size());
+        Assert.assertEquals("false", 
sqlServerJdbcProperties.getProperty("encrypt"));
+        Assert.assertEquals("false", 
sqlServerJdbcProperties.getProperty("integratedSecurity"));
+        String sqlServerJdbcUrlTemplate =
+                sqlServerDatabaseSync.getJdbcUrlTemplate(
+                        sqlServerJdbcTemplate, sqlServerJdbcProperties);
+        Assert.assertEquals(
+                sqlServerJdbcTemplate + 
"encrypt=false;integratedSecurity=false;",
+                sqlServerJdbcUrlTemplate);
+
+        Properties db2JdbcProperties = db2DatabaseSync.getJdbcProperties();
+        Assert.assertEquals(3, db2JdbcProperties.size());
+        Assert.assertEquals("false", db2JdbcProperties.getProperty("ssl"));
+        Assert.assertEquals("1", 
db2JdbcProperties.getProperty("allowNextOnExhaustedResultSet"));
+        Assert.assertEquals("1", 
db2JdbcProperties.getProperty("resultSetHoldability"));
+        String db2JdbcUrlTemplate =
+                db2DatabaseSync.getJdbcUrlTemplate(db2JdbcTemplate, 
db2JdbcProperties);
+        Assert.assertEquals(
+                db2JdbcTemplate
+                        + 
":allowNextOnExhaustedResultSet=1;ssl=false;resultSetHoldability=1;",
+                db2JdbcUrlTemplate);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to