This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 414d7c01 [FLINK-38851] Support passing arbitrary database options to
JDBC Catalog (#183)
414d7c01 is described below
commit 414d7c018a8646eeaf7a47295e0e4067c032a9a6
Author: Chan hae OH <[email protected]>
AuthorDate: Wed Mar 4 15:04:30 2026 +0900
[FLINK-38851] Support passing arbitrary database options to JDBC Catalog
(#183)
[FLINK-38851] Support passing arbitrary database options to JDBC Catalog
Co-authored-by: och5351 <[email protected]>
---
.../core/database/catalog/AbstractJdbcCatalog.java | 82 ++++++++++--
.../catalog/factory/JdbcCatalogFactory.java | 2 +-
.../catalog/factory/JdbcCatalogFactoryOptions.java | 2 +-
.../database/catalog/AbstractJdbcCatalogTest.java | 31 ++++-
.../cratedb/database/catalog/CrateDBCatalog.java | 2 +-
.../database/CreateDBCatalogFactoryTest.java | 146 +++++++++++++++++++++
.../jdbc/mysql/database/catalog/MySqlCatalog.java | 2 +-
.../mysql/database/MySqlCatalogFactoryTest.java | 145 ++++++++++++++++++++
.../database/OceanBaseCatalogFactoryTest.java | 145 ++++++++++++++++++++
.../postgres/database/catalog/PostgresCatalog.java | 2 +-
.../database/PostgresCatalogFactoryTest.java | 145 ++++++++++++++++++++
.../postgres/database/PostgresFactoryTest.java | 90 -------------
12 files changed, 680 insertions(+), 114 deletions(-)
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
index 0eedcf82..857176fe 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
@@ -73,6 +73,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
+import java.util.function.Function;
import java.util.function.Predicate;
import static
org.apache.flink.connector.jdbc.JdbcConnectionOptions.PASSWORD_KEY;
@@ -92,10 +93,19 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
public abstract class AbstractJdbcCatalog extends AbstractCatalog implements
JdbcCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
+ public static final String NO_DATABASES_HINT =
+ "No default database specified. Please set a database name.";
+ public static final String DATABASE_NOT_UNIQUE_HINT =
+ "Cannot uniquely identify the database name. \n"
+ + "Please specify a database name using one of these
methods: \n"
+ + "\t (1) Match 'default-database' and database name in
'base-url'. \n"
+ + "\t (2) Use only 'default-database' without database
name in 'base-url'. \n "
+ + "\t (3) Omit 'default-database' and include database
name in 'base-url'.";
protected final ClassLoader userClassLoader;
protected final String baseUrl;
protected final String defaultUrl;
+ protected final Function<String, String> urlFunction;
protected final Properties connectionProperties;
@Deprecated
@@ -120,17 +130,17 @@ public abstract class AbstractJdbcCatalog extends
AbstractCatalog implements Jdb
String defaultDatabase,
String baseUrl,
Properties connectionProperties) {
- super(catalogName, defaultDatabase);
+ super(catalogName, validateJdbcUrl(baseUrl, defaultDatabase));
checkNotNull(userClassLoader);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
- validateJdbcUrl(baseUrl);
-
this.userClassLoader = userClassLoader;
+ this.urlFunction = calculateUrlFunction(baseUrl);
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
- this.defaultUrl = getDatabaseUrl(defaultDatabase);
this.connectionProperties =
Preconditions.checkNotNull(connectionProperties);
+ this.defaultUrl = this.urlFunction.apply(defaultDatabase);
+
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
checkArgument(
@@ -138,10 +148,6 @@ public abstract class AbstractJdbcCatalog extends
AbstractCatalog implements Jdb
connectionProperties.getProperty(PASSWORD_KEY)));
}
- protected String getDatabaseUrl(String databaseName) {
- return baseUrl + databaseName;
- }
-
@Override
public void open() throws CatalogException {
// load the Driver use userClassLoader explicitly, see FLINK-15635 for
more detail
@@ -274,7 +280,8 @@ public abstract class AbstractJdbcCatalog extends
AbstractCatalog implements Jdb
String databaseName = tablePath.getDatabaseName();
try (Connection conn =
- DriverManager.getConnection(getDatabaseUrl(databaseName),
connectionProperties)) {
+ DriverManager.getConnection(
+ this.urlFunction.apply(databaseName),
connectionProperties)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<UniqueConstraint> primaryKey =
getPrimaryKey(
@@ -318,7 +325,7 @@ public abstract class AbstractJdbcCatalog extends
AbstractCatalog implements Jdb
protected Map<String, String> getOptions(ObjectPath tablePath) {
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
- props.put(URL.key(), getDatabaseUrl(tablePath.getDatabaseName()));
+ props.put(URL.key(),
this.urlFunction.apply(tablePath.getDatabaseName()));
props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
props.put(PASSWORD.key(),
connectionProperties.getProperty(PASSWORD_KEY));
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
@@ -579,14 +586,61 @@ public abstract class AbstractJdbcCatalog extends
AbstractCatalog implements Jdb
throw new UnsupportedOperationException();
}
+ private Function<String, String> calculateUrlFunction(String url) {
+ final String[] parts;
+ final int questionMarkIndex = url.indexOf('?');
+ if (questionMarkIndex == -1) {
+ parts = url.split("/+", 3);
+ return dbName -> parts.length == 3 ? url.trim() : url.trim() + "/"
+ dbName;
+ } else {
+ String withoutParams = url.substring(0, questionMarkIndex);
+ String prefix = withoutParams.substring(0,
withoutParams.lastIndexOf('/') + 1);
+ String params = url.substring(questionMarkIndex + 1);
+
+ return dbName -> dbName == null ? url : prefix + dbName + "?" +
params;
+ }
+ }
+
/**
* URL has to be without database, like "jdbc:dialect://localhost:1234/" or
* "jdbc:dialect://localhost:1234" rather than
"jdbc:dialect://localhost:1234/db".
*/
- protected static void validateJdbcUrl(String url) {
- String[] parts = url.trim().split("\\/+");
-
- checkArgument(parts.length == 2);
+ protected static String validateJdbcUrl(String url, String
defaultDatabase) {
+ String trimmedUrl = url.trim();
+ String processedUrl =
+ trimmedUrl.endsWith("/")
+ ? trimmedUrl.substring(0, trimmedUrl.length() - 1)
+ : trimmedUrl;
+ String[] parts = processedUrl.split("/+", 3);
+ String database;
+
+ int questionMark = trimmedUrl.indexOf('?');
+ if (questionMark == -1) {
+ if (defaultDatabase == null) {
+ checkArgument(parts.length > 2, NO_DATABASES_HINT);
+ database = parts[2];
+ } else {
+ checkArgument(
+ parts.length == 2
+ || (parts.length == 3 &&
defaultDatabase.equals(parts[2])),
+ DATABASE_NOT_UNIQUE_HINT);
+ database = defaultDatabase;
+ }
+ } else {
+ checkArgument(
+ parts.length > 2 && !parts[2].startsWith("?"),
+ "Please set a database name in base-url option.");
+ questionMark = parts[2].indexOf('?');
+ if (defaultDatabase == null) {
+ database = parts[2].substring(0, questionMark);
+ } else {
+ checkArgument(
+ defaultDatabase.equals(parts[2].substring(0,
questionMark)),
+ DATABASE_NOT_UNIQUE_HINT);
+ database = defaultDatabase;
+ }
+ }
+ return database;
}
@Override
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
index ca27d36b..57a8838e 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
@@ -51,7 +51,6 @@ public class JdbcCatalogFactory implements CatalogFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(DEFAULT_DATABASE);
options.add(USERNAME);
options.add(PASSWORD);
options.add(BASE_URL);
@@ -61,6 +60,7 @@ public class JdbcCatalogFactory implements CatalogFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DEFAULT_DATABASE);
options.add(PROPERTY_VERSION);
options.add(COMPATIBLE_MODE);
return options;
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
index 02c607b4..43cbaf88 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
@@ -34,7 +34,7 @@ public class JdbcCatalogFactoryOptions {
public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
.stringType()
- .noDefaultValue();
+ .defaultValue(null);
public static final ConfigOption<String> USERNAME =
JdbcConnectorOptions.USERNAME;
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
index 371140d3..1c67daa2 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
@@ -26,9 +26,11 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
class AbstractJdbcCatalogTest {
@Test
- void testJdbcUrl() {
- AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/");
- AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234");
+ void testValidJdbcUrl() {
+
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", "db");
+
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", null);
+ AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/",
"db");
+ AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234",
"db");
}
@Test
@@ -36,7 +38,26 @@ class AbstractJdbcCatalogTest {
assertThatThrownBy(
() ->
AbstractJdbcCatalog.validateJdbcUrl(
- "jdbc:dialect://localhost:1234/db"))
- .isInstanceOf(IllegalArgumentException.class);
+ "jdbc:dialect://localhost:1234", null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(AbstractJdbcCatalog.NO_DATABASES_HINT);
+ assertThatThrownBy(
+ () ->
+ AbstractJdbcCatalog.validateJdbcUrl(
+ "jdbc:dialect://localhost:1234/",
null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(AbstractJdbcCatalog.NO_DATABASES_HINT);
+ assertThatThrownBy(
+ () ->
+ AbstractJdbcCatalog.validateJdbcUrl(
+ "jdbc:dialect://localhost:1234/db",
""))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(AbstractJdbcCatalog.DATABASE_NOT_UNIQUE_HINT);
+ assertThatThrownBy(
+ () ->
+ AbstractJdbcCatalog.validateJdbcUrl(
+ "jdbc:dialect://localhost:1234/db",
"not_db"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(AbstractJdbcCatalog.DATABASE_NOT_UNIQUE_HINT);
}
}
diff --git
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
index 288ac0e7..d0e727b9 100644
---
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
+++
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
@@ -135,7 +135,7 @@ public class CrateDBCatalog extends PostgresCatalog {
String searchPath =
extractColumnValuesBySQL(
- getDatabaseUrl(DEFAULT_DATABASE), "show
search_path", 1, null)
+ urlFunction.apply(DEFAULT_DATABASE), "show
search_path", 1, null)
.get(0);
String[] schemas = searchPath.split("\\s*,\\s*");
diff --git
a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CreateDBCatalogFactoryTest.java
b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CreateDBCatalogFactoryTest.java
new file mode 100644
index 00000000..5e2fc84e
--- /dev/null
+++
b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CreateDBCatalogFactoryTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.cratedb.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.cratedb.CrateDBTestBase;
+import org.apache.flink.connector.jdbc.cratedb.database.catalog.CrateDBCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for CrateDB database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests,
verifying that {@link
+ * CrateDBCatalog} is correctly instantiated and configured.
+ */
+public class CreateDBCatalogFactoryTest implements CrateDBTestBase {
+
+ protected static final String TEST_CATALOG_NAME = "mycratedb";
+
+ @Test
+ void testFactoryWithBaseUrlAndDefaultDatabase() {
+ // Test case 1: base-url without database name + default-database =
"mycratedb"
+ // jdbc:crate://localhost:49290/crate
+ String jdbcUrl = getMetadata().getJdbcUrl();
+ // jdbc:crate://localhost:49290/
+ String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ CrateDBCatalog.DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl,
+ null);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(
+ JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
CrateDBCatalog.DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(CrateDBCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithFullUrlAndNullDefaultDatabase() {
+ // Test case 2: base-url with database name + default-database = null
+ // jdbc:crate://localhost:49290/crate
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ null,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(CrateDBCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithMatchingDefaultDatabase() {
+ // Test case 3: base-url with database name + default-database matches
the database in URL
+ // jdbc:crate://localhost:49290/crate
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ CrateDBCatalog.DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(
+ JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
CrateDBCatalog.DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(CrateDBCatalog.class);
+ }
+}
diff --git
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
index 27bdf93e..d693dbe2 100644
---
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
+++
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
@@ -118,7 +118,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
}
return extractColumnValuesBySQL(
- getDatabaseUrl(databaseName),
+ urlFunction.apply(databaseName),
"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE
TABLE_SCHEMA = ?",
1,
null,
diff --git
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlCatalogFactoryTest.java
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlCatalogFactoryTest.java
new file mode 100644
index 00000000..84c1984f
--- /dev/null
+++
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlCatalogFactoryTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.mysql.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.mysql.MySqlTestBase;
+import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for MySQL database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests,
verifying that {@link
+ * MySqlCatalog} is correctly instantiated and configured.
+ */
+public class MySqlCatalogFactoryTest implements MySqlTestBase {
+
+ protected static final String TEST_CATALOG_NAME = "mysql_catalog";
+ protected static final String DEFAULT_DATABASE = "test";
+
+ @Test
+ void testFactoryWithBaseUrlAndDefaultDatabase() {
+ // Test case 1: base-url without database name + default-database =
+ // "oceanbase_mysql_catalog"
+ // jdbc:mysql://localhost:56336/test
+ String jdbcUrl = getMetadata().getJdbcUrl();
+ // jdbc:mysql://localhost:56336/
+ String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl,
+ null);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(MySqlCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithFullUrlAndNullDefaultDatabase() {
+ // Test case 2: base-url with database name + default-database = null
+ // jdbc:mysql://localhost:56336/test
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ null,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(MySqlCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithMatchingDefaultDatabase() {
+ // Test case 3: base-url with database name + default-database matches
the database in URL
+ // jdbc:mysql://localhost:56336/test
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(MySqlCatalog.class);
+ }
+}
diff --git
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseCatalogFactoryTest.java
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseCatalogFactoryTest.java
new file mode 100644
index 00000000..6198a4d7
--- /dev/null
+++
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseCatalogFactoryTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase;
+import
org.apache.flink.connector.jdbc.oceanbase.database.catalog.OceanBaseCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for OceanBase database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests,
verifying that {@link
+ * OceanBaseCatalog} is correctly instantiated and configured.
+ */
+public class OceanBaseCatalogFactoryTest implements OceanBaseMysqlTestBase {
+ protected static final String TEST_CATALOG_NAME =
"oceanbase_mysql_catalog";
+ protected static final String DEFAULT_DATABASE = "test";
+
+ @Test
+ void testFactoryWithBaseUrlAndDefaultDatabase() {
+ // Test case 1: base-url without database name + default-database =
+ // "oceanbase_mysql_catalog"
+ //
jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false
+ String jdbcUrl = getMetadata().getJdbcUrl();
+ // jdbc:mysql://localhost:56336/
+ String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl,
+ null);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(OceanBaseCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithFullUrlAndNullDefaultDatabase() {
+ // Test case 2: base-url with database name + default-database = null
+ //
jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ null,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(OceanBaseCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithMatchingDefaultDatabase() {
+ // Test case 3: base-url with database name + default-database matches
the database in URL
+ //
jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(OceanBaseCatalog.class);
+ }
+}
diff --git
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
index def48d64..4aff5907 100644
---
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
+++
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
@@ -187,7 +187,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
throw new DatabaseNotExistException(getName(), databaseName);
}
- final String url = getDatabaseUrl(databaseName);
+ final String url = urlFunction.apply(databaseName);
try (Connection conn = DriverManager.getConnection(url,
connectionProperties)) {
// get all schemas
List<String> schemas;
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresCatalogFactoryTest.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresCatalogFactoryTest.java
new file mode 100644
index 00000000..331e1b49
--- /dev/null
+++
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresCatalogFactoryTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.postgres.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
+import
org.apache.flink.connector.jdbc.postgres.database.catalog.PostgresCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for OceanBase database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests,
verifying that {@link
+ * PostgresCatalog} is correctly instantiated and configured.
+ */
+class PostgresCatalogFactoryTest implements PostgresTestBase {
+
+ private static final String TEST_CATALOG_NAME = "mypg";
+ private static final String DEFAULT_DATABASE = "test";
+
+ @Test
+ void testFactoryWithBaseUrlAndDefaultDatabase() {
+ // Test case 1: base-url without database name + default-database =
"postgres"
+ // jdbc:postgresql://localhost:50807/postgres ->
jdbc:postgresql://localhost:50807/
+ String jdbcUrl = getMetadata().getJdbcUrl();
+ String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+ JdbcCatalog expectedCatalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ PostgresCatalog.DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl,
+ null);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(
+ JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
PostgresCatalog.DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(expectedCatalog).isInstanceOf(PostgresCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithFullUrlAndNullDefaultDatabase() {
+ // Test case 2: base-url with database name + default-database = null
+ // jdbc:postgresql://localhost:53036/test?loggerLevel=OFF (full URL)
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog expectedCatalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ null,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(expectedCatalog).isInstanceOf(PostgresCatalog.class);
+ }
+
+ @Test
+ void testFactoryWithMatchingDefaultDatabase() {
+ // Test case 3: base-url with database name + default-database matches
the database in URL
+ // jdbc:postgresql://localhost:53036/test?loggerLevel=OFF +
default-database = "test"
+ String baseUrl = getMetadata().getJdbcUrl();
+
+ JdbcCatalog expectedCatalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl);
+
+ final Map<String, String> options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
+ options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
DEFAULT_DATABASE);
+ options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
+ options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
+ options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ TEST_CATALOG_NAME,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+
+
assertThat(actualCatalog).isEqualTo(expectedCatalog).isInstanceOf(PostgresCatalog.class);
+ }
+}
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java
deleted file mode 100644
index 1b3d2330..00000000
---
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.jdbc.postgres.database;
-
-import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
-import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
-import
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory;
-import
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
-import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
-import
org.apache.flink.connector.jdbc.postgres.database.catalog.PostgresCatalog;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
-import org.apache.flink.table.factories.FactoryUtil;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link JdbcCatalogFactory}. */
-class PostgresFactoryTest implements PostgresTestBase {
-
- protected static String baseUrl;
- protected static JdbcCatalog catalog;
-
- protected static final String TEST_CATALOG_NAME = "mypg";
-
- @BeforeEach
- void setup() {
- // jdbc:postgresql://localhost:50807/postgres?user=postgres
- String jdbcUrl = getMetadata().getJdbcUrl();
- // jdbc:postgresql://localhost:50807/
- baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
-
- catalog =
- JdbcFactoryLoader.loadCatalog(
- Thread.currentThread().getContextClassLoader(),
- TEST_CATALOG_NAME,
- PostgresCatalog.DEFAULT_DATABASE,
- getMetadata().getUsername(),
- getMetadata().getPassword(),
- baseUrl,
- null);
- }
-
- @Test
- void test() {
- final Map<String, String> options = new HashMap<>();
- options.put(CommonCatalogOptions.CATALOG_TYPE.key(),
JdbcCatalogFactoryOptions.IDENTIFIER);
- options.put(
- JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(),
PostgresCatalog.DEFAULT_DATABASE);
- options.put(JdbcCatalogFactoryOptions.USERNAME.key(),
getMetadata().getUsername());
- options.put(JdbcCatalogFactoryOptions.PASSWORD.key(),
getMetadata().getPassword());
- options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
-
- final Catalog actualCatalog =
- FactoryUtil.createCatalog(
- TEST_CATALOG_NAME,
- options,
- null,
- Thread.currentThread().getContextClassLoader());
-
- checkEquals(catalog, (JdbcCatalog) actualCatalog);
-
- assertThat((JdbcCatalog)
actualCatalog).isInstanceOf(PostgresCatalog.class);
- }
-
- private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) {
- assertThat(c2).isEqualTo(c1);
- }
-}