This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1869a7d0 [improve](cdc) add jdbc properties configuration (#450)
1869a7d0 is described below
commit 1869a7d02c02a08399bd2c0d27e75a2e9b176793
Author: Petrichor <[email protected]>
AuthorDate: Mon Aug 12 15:12:59 2024 +0800
[improve](cdc) add jdbc properties configuration (#450)
In the previous version, we were unable to configure the JDBC URL
properties for Postgres, SqlServer, and Db2, which might cause some connection
issues in certain scenarios.
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 21 +++++
.../doris/flink/tools/cdc/db2/Db2DatabaseSync.java | 21 ++++-
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 21 +----
.../tools/cdc/postgres/PostgresDatabaseSync.java | 27 +++++-
.../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 14 +++-
.../flink/tools/cdc/CdcDb2SyncDatabaseCase.java | 4 +
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 3 +
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 2 +
.../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 3 +
.../doris/flink/tools/cdc/DatabaseSyncTest.java | 97 ++++++++++++++++++++++
10 files changed, 189 insertions(+), 24 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e565d0a7..2f672adb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -59,6 +59,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
+import static
org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;
+
public abstract class DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(DatabaseSync.class);
private static final String TABLE_NAME_OPTIONS = "table-name";
@@ -483,6 +485,25 @@ public abstract class DatabaseSync {
}
}
+ protected Properties getJdbcProperties() {
+ Properties jdbcProps = new Properties();
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(PROPERTIES_PREFIX)) {
+ jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()),
value);
+ }
+ }
+ return jdbcProps;
+ }
+
+ protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties
jdbcProperties) {
+ StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+ jdbcProperties.forEach(
+ (key, value) ->
jdbcUrlBuilder.append("&").append(key).append("=").append(value));
+ return jdbcUrlBuilder.toString();
+ }
+
public DatabaseSync setEnv(StreamExecutionEnvironment env) {
this.env = env;
return this;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
index 2dcd21a9..3947c1e1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
@@ -89,9 +89,11 @@ public class Db2DatabaseSync extends DatabaseSync {
@Override
public Connection getConnection() throws SQLException {
+ Properties jdbcProperties = getJdbcProperties();
+ String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
- JDBC_URL,
+ jdbcUrlTemplate,
config.get(JdbcSourceOptions.HOSTNAME),
config.get(PORT),
config.get(JdbcSourceOptions.DATABASE_NAME));
@@ -224,4 +226,21 @@ public class Db2DatabaseSync extends DatabaseSync {
public String getTableListPrefix() {
return config.get(JdbcSourceOptions.SCHEMA_NAME);
}
+
+ @Override
+ protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties
jdbcProperties) {
+ StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+ boolean firstParam = true;
+ for (Map.Entry<Object, Object> entry : jdbcProperties.entrySet()) {
+ Object key = entry.getKey();
+ Object value = entry.getValue();
+ if (firstParam) {
+
jdbcUrlBuilder.append(":").append(key).append("=").append(value).append(";");
+ firstParam = false;
+ } else {
+
jdbcUrlBuilder.append(key).append("=").append(value).append(";");
+ }
+ }
+ return jdbcUrlBuilder.toString();
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index a58008cd..9fdfdcae 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -55,10 +55,11 @@ import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static
org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;
+
public class MysqlDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(MysqlDatabaseSync.class);
private static final String JDBC_URL =
"jdbc:mysql://%s:%d?useInformationSchema=true";
- private static final String PROPERTIES_PREFIX = "jdbc.properties.";
public MysqlDatabaseSync() throws SQLException {
super();
@@ -83,12 +84,10 @@ public class MysqlDatabaseSync extends DatabaseSync {
@Override
public Connection getConnection() throws SQLException {
Properties jdbcProperties = getJdbcProperties();
- StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL);
- jdbcProperties.forEach(
- (key, value) ->
jdbcUrlSb.append("&").append(key).append("=").append(value));
+ String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
- jdbcUrlSb.toString(),
+ jdbcUrlTemplate,
config.get(MySqlSourceOptions.HOSTNAME),
config.get(MySqlSourceOptions.PORT));
@@ -269,16 +268,4 @@ public class MysqlDatabaseSync extends DatabaseSync {
}
return chunkMap;
}
-
- private Properties getJdbcProperties() {
- Properties jdbcProps = new Properties();
- for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if (key.startsWith(PROPERTIES_PREFIX)) {
- jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()),
value);
- }
- }
- return jdbcProps;
- }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 66e26d15..15fc632b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -66,7 +66,7 @@ import static
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSou
public class PostgresDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(PostgresDatabaseSync.class);
- private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
+ private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s?";
public PostgresDatabaseSync() throws SQLException {
super();
@@ -84,9 +84,11 @@ public class PostgresDatabaseSync extends DatabaseSync {
@Override
public Connection getConnection() throws SQLException {
+ Properties jdbcProperties = getJdbcProperties();
+ String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
- JDBC_URL,
+ jdbcUrlTemplate,
config.get(PostgresSourceOptions.HOSTNAME),
config.get(PostgresSourceOptions.PG_PORT),
config.get(PostgresSourceOptions.DATABASE_NAME));
@@ -227,7 +229,24 @@ public class PostgresDatabaseSync extends DatabaseSync {
@Override
public String getTableListPrefix() {
- String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
- return schemaName;
+ return config.get(PostgresSourceOptions.SCHEMA_NAME);
+ }
+
+ @Override
+ protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties
jdbcProperties) {
+
+ if (!initialJdbcUrl.startsWith("?")) {
+ return super.getJdbcUrlTemplate(initialJdbcUrl, jdbcProperties);
+ }
+ StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+ int recordIndex = 0;
+ for (Map.Entry<Object, Object> entry : jdbcProperties.entrySet()) {
+
jdbcUrlBuilder.append(entry.getKey()).append("=").append(entry.getValue());
+ if (recordIndex < jdbcProperties.size() - 1) {
+ jdbcUrlBuilder.append("&");
+ recordIndex++;
+ }
+ }
+ return jdbcUrlBuilder.toString();
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index 08c54dd3..cb6b6682 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -63,7 +63,7 @@ import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_K
public class SqlServerDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(SqlServerDatabaseSync.class);
- private static final String JDBC_URL =
"jdbc:sqlserver://%s:%d;database=%s";
+ private static final String JDBC_URL =
"jdbc:sqlserver://%s:%d;database=%s;";
private static final String PORT = "port";
public SqlServerDatabaseSync() throws SQLException {
@@ -82,9 +82,11 @@ public class SqlServerDatabaseSync extends DatabaseSync {
@Override
public Connection getConnection() throws SQLException {
+ Properties jdbcProperties = getJdbcProperties();
+ String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
- JDBC_URL,
+ jdbcUrlTemplate,
config.get(JdbcSourceOptions.HOSTNAME),
config.getInteger(PORT, 1433),
config.get(JdbcSourceOptions.DATABASE_NAME));
@@ -216,4 +218,12 @@ public class SqlServerDatabaseSync extends DatabaseSync {
public String getTableListPrefix() {
return config.get(JdbcSourceOptions.SCHEMA_NAME);
}
+
+ @Override
+ public String getJdbcUrlTemplate(String initialJdbcUrl, Properties
jdbcProperties) {
+ StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
+ jdbcProperties.forEach(
+ (key, value) ->
jdbcUrlBuilder.append(key).append("=").append(value).append(";"));
+ return jdbcUrlBuilder.toString();
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
index 0327079a..0666cb9d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -49,6 +49,10 @@ public class CdcDb2SyncDatabaseCase {
sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1");
sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456");
sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true");
+ // add jdbc properties configuration
+ sourceConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1");
+ sourceConfig.put("jdbc.properties.resultSetHoldability", "1");
+ sourceConfig.put("jdbc.properties.SSL", "false");
Configuration config = Configuration.fromMap(sourceConfig);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index e0c0b828..c430ea87 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -53,6 +53,8 @@ public class CdcMysqlSyncDatabaseCase {
mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306");
mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "root");
mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "12345678");
+ // add jdbc properties for MySQL
+ mysqlConfig.put("jdbc.properties.use_ssl", "false");
Configuration config = Configuration.fromMap(mysqlConfig);
Map<String, String> sinkConfig = new HashMap<>();
@@ -61,6 +63,7 @@ public class CdcMysqlSyncDatabaseCase {
sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "");
sinkConfig.put(DorisConfigOptions.JDBC_URL.key(),
"jdbc:mysql://10.20.30.1:9030");
sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(),
UUID.randomUUID().toString());
+ sinkConfig.put("sink.enable-delete", "false");
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index f1e61e72..99892e02 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -49,6 +49,8 @@ public class CdcPostgresSyncDatabaseCase {
sourceConfig.put(PostgresSourceOptions.PG_PORT.key(), "5432");
sourceConfig.put(PostgresSourceOptions.USERNAME.key(), "postgres");
sourceConfig.put(PostgresSourceOptions.PASSWORD.key(), "123456");
+ // add jdbc properties configuration
+ sourceConfig.put("jdbc.properties.ssl", "false");
//
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
// sourceConfig.put("scan.incremental.snapshot.enabled","true");
// sourceConfig.put("debezium.include.schema.changes","false");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 4e343239..ca6a3121 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -47,6 +47,9 @@ public class CdcSqlServerSyncDatabaseCase {
sourceConfig.put(DatabaseSyncConfig.PORT, "1433");
sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "sa");
sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "Passw@rd");
+ // add jdbc properties configuration
+ sourceConfig.put("jdbc.properties.encrypt", "false");
+ sourceConfig.put("jdbc.properties.integratedSecurity", "false");
//
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
// sourceConfig.put("scan.incremental.snapshot.enabled","true");
// sourceConfig.put("debezium.include.schema.changes","false");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index f0cd0a51..859a8720 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -20,16 +20,22 @@ package org.apache.doris.flink.tools.cdc;
import org.apache.flink.configuration.Configuration;
import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
import org.junit.Test;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -169,4 +175,95 @@ public class DatabaseSyncTest {
assertFalse("ssb_test.dates".matches(syncTableListPattern));
assertFalse("ssb_test.lineorder".matches(syncTableListPattern));
}
+
+ @Test
+ public void getJdbcPropertiesTest() throws Exception {
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ Map<String, String> mysqlConfig = new HashMap<>();
+ mysqlConfig.put("jdbc.properties.use_ssl", "false");
+
+ Configuration config = Configuration.fromMap(mysqlConfig);
+ databaseSync.setConfig(config);
+ Properties jdbcProperties = databaseSync.getJdbcProperties();
+ Assert.assertEquals(1, jdbcProperties.size());
+ Assert.assertEquals("false", jdbcProperties.getProperty("use_ssl"));
+ }
+
+ @Test
+ public void getJdbcUrlTemplateTest() throws SQLException {
+ String mysqlJdbcTemplate =
"jdbc:mysql://%s:%d?useInformationSchema=true";
+ String postgresJdbcTemplate = "jdbc:postgresql://%s:%d/%s?";
+ String sqlServerJdbcTemplate = "jdbc:sqlserver://%s:%d;database=%s;";
+ String db2JdbcTemplate = "jdbc:db2://%s:%d/%s";
+
+ // mysql jdbc properties configuration
+ DatabaseSync mysqlDatabaseSync = new MysqlDatabaseSync();
+ Map<String, String> mysqlJdbcConfig = new LinkedHashMap<>();
+ mysqlJdbcConfig.put("jdbc.properties.use_ssl", "false");
+
+ DatabaseSync postgresDatabaseSync = new PostgresDatabaseSync();
+ Map<String, String> postgresJdbcConfig = new LinkedHashMap<>();
+ postgresJdbcConfig.put("jdbc.properties.ssl", "false");
+
+ DatabaseSync sqlServerDatabaseSync = new SqlServerDatabaseSync();
+ Map<String, String> sqlServerJdbcConfig = new LinkedHashMap<>();
+ sqlServerJdbcConfig.put("jdbc.properties.encrypt", "false");
+ sqlServerJdbcConfig.put("jdbc.properties.integratedSecurity", "false");
+
+ DatabaseSync db2DatabaseSync = new Db2DatabaseSync();
+ Map<String, String> db2JdbcConfig = new LinkedHashMap<>();
+ db2JdbcConfig.put("jdbc.properties.ssl", "false");
+ db2JdbcConfig.put("jdbc.properties.allowNextOnExhaustedResultSet",
"1");
+ db2JdbcConfig.put("jdbc.properties.resultSetHoldability", "1");
+
+ Configuration mysqlConfig = Configuration.fromMap(mysqlJdbcConfig);
+ mysqlDatabaseSync.setConfig(mysqlConfig);
+
+ Configuration postgresConfig =
Configuration.fromMap(postgresJdbcConfig);
+ postgresDatabaseSync.setConfig(postgresConfig);
+
+ Configuration sqlServerConfig =
Configuration.fromMap(sqlServerJdbcConfig);
+ sqlServerDatabaseSync.setConfig(sqlServerConfig);
+
+ Configuration db2Config = Configuration.fromMap(db2JdbcConfig);
+ db2DatabaseSync.setConfig(db2Config);
+
+ Properties mysqlJdbcProperties = mysqlDatabaseSync.getJdbcProperties();
+ Assert.assertEquals(1, mysqlJdbcProperties.size());
+ Assert.assertEquals("false",
mysqlJdbcProperties.getProperty("use_ssl"));
+ String mysqlJdbcUrlTemplate =
+ mysqlDatabaseSync.getJdbcUrlTemplate(mysqlJdbcTemplate,
mysqlJdbcProperties);
+ Assert.assertEquals(mysqlJdbcTemplate + "&use_ssl=false",
mysqlJdbcUrlTemplate);
+
+ Properties postgresJdbcProperties =
postgresDatabaseSync.getJdbcProperties();
+ Assert.assertEquals(1, postgresJdbcProperties.size());
+ Assert.assertEquals("false",
postgresJdbcProperties.getProperty("ssl"));
+ String postgresJdbcUrlTemplate =
+ postgresDatabaseSync.getJdbcUrlTemplate(
+ postgresJdbcTemplate, postgresJdbcProperties);
+ Assert.assertEquals(postgresJdbcTemplate + "&ssl=false",
postgresJdbcUrlTemplate);
+
+ Properties sqlServerJdbcProperties =
sqlServerDatabaseSync.getJdbcProperties();
+ Assert.assertEquals(2, sqlServerJdbcProperties.size());
+ Assert.assertEquals("false",
sqlServerJdbcProperties.getProperty("encrypt"));
+ Assert.assertEquals("false",
sqlServerJdbcProperties.getProperty("integratedSecurity"));
+ String sqlServerJdbcUrlTemplate =
+ sqlServerDatabaseSync.getJdbcUrlTemplate(
+ sqlServerJdbcTemplate, sqlServerJdbcProperties);
+ Assert.assertEquals(
+ sqlServerJdbcTemplate +
"encrypt=false;integratedSecurity=false;",
+ sqlServerJdbcUrlTemplate);
+
+ Properties db2JdbcProperties = db2DatabaseSync.getJdbcProperties();
+ Assert.assertEquals(3, db2JdbcProperties.size());
+ Assert.assertEquals("false", db2JdbcProperties.getProperty("ssl"));
+ Assert.assertEquals("1",
db2JdbcProperties.getProperty("allowNextOnExhaustedResultSet"));
+ Assert.assertEquals("1",
db2JdbcProperties.getProperty("resultSetHoldability"));
+ String db2JdbcUrlTemplate =
+ db2DatabaseSync.getJdbcUrlTemplate(db2JdbcTemplate,
db2JdbcProperties);
+ Assert.assertEquals(
+ db2JdbcTemplate
+ +
":allowNextOnExhaustedResultSet=1;ssl=false;resultSetHoldability=1;",
+ db2JdbcUrlTemplate);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]