This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 414d7c01 [FLINK-38851] Support passing arbitrary database options to 
JDBC Catalog (#183)
414d7c01 is described below

commit 414d7c018a8646eeaf7a47295e0e4067c032a9a6
Author: Chan hae OH <[email protected]>
AuthorDate: Wed Mar 4 15:04:30 2026 +0900

    [FLINK-38851] Support passing arbitrary database options to JDBC Catalog 
(#183)
    
    [FLINK-38851] Support passing arbitrary database options to JDBC Catalog
    
    Co-authored-by: och5351 <[email protected]>
---
 .../core/database/catalog/AbstractJdbcCatalog.java |  82 ++++++++++--
 .../catalog/factory/JdbcCatalogFactory.java        |   2 +-
 .../catalog/factory/JdbcCatalogFactoryOptions.java |   2 +-
 .../database/catalog/AbstractJdbcCatalogTest.java  |  31 ++++-
 .../cratedb/database/catalog/CrateDBCatalog.java   |   2 +-
 .../database/CreateDBCatalogFactoryTest.java       | 146 +++++++++++++++++++++
 .../jdbc/mysql/database/catalog/MySqlCatalog.java  |   2 +-
 .../mysql/database/MySqlCatalogFactoryTest.java    | 145 ++++++++++++++++++++
 .../database/OceanBaseCatalogFactoryTest.java      | 145 ++++++++++++++++++++
 .../postgres/database/catalog/PostgresCatalog.java |   2 +-
 .../database/PostgresCatalogFactoryTest.java       | 145 ++++++++++++++++++++
 .../postgres/database/PostgresFactoryTest.java     |  90 -------------
 12 files changed, 680 insertions(+), 114 deletions(-)

diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
index 0eedcf82..857176fe 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
@@ -73,6 +73,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.function.Function;
 import java.util.function.Predicate;
 
 import static 
org.apache.flink.connector.jdbc.JdbcConnectionOptions.PASSWORD_KEY;
@@ -92,10 +93,19 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public abstract class AbstractJdbcCatalog extends AbstractCatalog implements 
JdbcCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
+    public static final String NO_DATABASES_HINT =
+            "No default database specified. Please set a database name.";
+    public static final String DATABASE_NOT_UNIQUE_HINT =
+            "Cannot uniquely identify the database name. \n"
+                    + "Please specify a database name using one of these 
methods: \n"
+                    + "\t (1) Match 'default-database' and database name in 
'base-url'. \n"
+                    + "\t (2) Use only 'default-database' without database 
name in 'base-url'. \n "
+                    + "\t (3) Omit 'default-database' and include database 
name in 'base-url'.";
 
     protected final ClassLoader userClassLoader;
     protected final String baseUrl;
     protected final String defaultUrl;
+    protected final Function<String, String> urlFunction;
     protected final Properties connectionProperties;
 
     @Deprecated
@@ -120,17 +130,17 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
             String defaultDatabase,
             String baseUrl,
             Properties connectionProperties) {
-        super(catalogName, defaultDatabase);
+        super(catalogName, validateJdbcUrl(baseUrl, defaultDatabase));
 
         checkNotNull(userClassLoader);
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
 
-        validateJdbcUrl(baseUrl);
-
         this.userClassLoader = userClassLoader;
+        this.urlFunction = calculateUrlFunction(baseUrl);
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
-        this.defaultUrl = getDatabaseUrl(defaultDatabase);
         this.connectionProperties = 
Preconditions.checkNotNull(connectionProperties);
+        this.defaultUrl = this.urlFunction.apply(defaultDatabase);
+
         checkArgument(
                 
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
         checkArgument(
@@ -138,10 +148,6 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
                         connectionProperties.getProperty(PASSWORD_KEY)));
     }
 
-    protected String getDatabaseUrl(String databaseName) {
-        return baseUrl + databaseName;
-    }
-
     @Override
     public void open() throws CatalogException {
         // load the Driver use userClassLoader explicitly, see FLINK-15635 for 
more detail
@@ -274,7 +280,8 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
         String databaseName = tablePath.getDatabaseName();
 
         try (Connection conn =
-                DriverManager.getConnection(getDatabaseUrl(databaseName), 
connectionProperties)) {
+                DriverManager.getConnection(
+                        this.urlFunction.apply(databaseName), 
connectionProperties)) {
             DatabaseMetaData metaData = conn.getMetaData();
             Optional<UniqueConstraint> primaryKey =
                     getPrimaryKey(
@@ -318,7 +325,7 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
     protected Map<String, String> getOptions(ObjectPath tablePath) {
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR.key(), IDENTIFIER);
-        props.put(URL.key(), getDatabaseUrl(tablePath.getDatabaseName()));
+        props.put(URL.key(), 
this.urlFunction.apply(tablePath.getDatabaseName()));
         props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
         props.put(PASSWORD.key(), 
connectionProperties.getProperty(PASSWORD_KEY));
         props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
@@ -579,14 +586,61 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
         throw new UnsupportedOperationException();
     }
 
+    private Function<String, String> calculateUrlFunction(String url) {
+        final String[] parts;
+        final int questionMarkIndex = url.indexOf('?');
+        if (questionMarkIndex == -1) {
+            parts = url.split("/+", 3);
+            return dbName -> parts.length == 3 ? url.trim() : url.trim() + "/" 
+ dbName;
+        } else {
+            String withoutParams = url.substring(0, questionMarkIndex);
+            String prefix = withoutParams.substring(0, 
withoutParams.lastIndexOf('/') + 1);
+            String params = url.substring(questionMarkIndex + 1);
+
+            return dbName -> dbName == null ? url : prefix + dbName + "?" + 
params;
+        }
+    }
+
     /**
      * URL has to be without database, like "jdbc:dialect://localhost:1234/" or
      * "jdbc:dialect://localhost:1234" rather than 
"jdbc:dialect://localhost:1234/db".
      */
-    protected static void validateJdbcUrl(String url) {
-        String[] parts = url.trim().split("\\/+");
-
-        checkArgument(parts.length == 2);
+    protected static String validateJdbcUrl(String url, String 
defaultDatabase) {
+        String trimmedUrl = url.trim();
+        String processedUrl =
+                trimmedUrl.endsWith("/")
+                        ? trimmedUrl.substring(0, trimmedUrl.length() - 1)
+                        : trimmedUrl;
+        String[] parts = processedUrl.split("/+", 3);
+        String database;
+
+        int questionMark = trimmedUrl.indexOf('?');
+        if (questionMark == -1) {
+            if (defaultDatabase == null) {
+                checkArgument(parts.length > 2, NO_DATABASES_HINT);
+                database = parts[2];
+            } else {
+                checkArgument(
+                        parts.length == 2
+                                || (parts.length == 3 && 
defaultDatabase.equals(parts[2])),
+                        DATABASE_NOT_UNIQUE_HINT);
+                database = defaultDatabase;
+            }
+        } else {
+            checkArgument(
+                    parts.length > 2 && !parts[2].startsWith("?"),
+                    "Please set a database name in base-url option.");
+            questionMark = parts[2].indexOf('?');
+            if (defaultDatabase == null) {
+                database = parts[2].substring(0, questionMark);
+            } else {
+                checkArgument(
+                        defaultDatabase.equals(parts[2].substring(0, 
questionMark)),
+                        DATABASE_NOT_UNIQUE_HINT);
+                database = defaultDatabase;
+            }
+        }
+        return database;
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
index ca27d36b..57a8838e 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java
@@ -51,7 +51,6 @@ public class JdbcCatalogFactory implements CatalogFactory {
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(DEFAULT_DATABASE);
         options.add(USERNAME);
         options.add(PASSWORD);
         options.add(BASE_URL);
@@ -61,6 +60,7 @@ public class JdbcCatalogFactory implements CatalogFactory {
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
         options.add(PROPERTY_VERSION);
         options.add(COMPATIBLE_MODE);
         return options;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
index 02c607b4..43cbaf88 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
@@ -34,7 +34,7 @@ public class JdbcCatalogFactoryOptions {
     public static final ConfigOption<String> DEFAULT_DATABASE =
             ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
                     .stringType()
-                    .noDefaultValue();
+                    .defaultValue(null);
 
     public static final ConfigOption<String> USERNAME = 
JdbcConnectorOptions.USERNAME;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
index 371140d3..1c67daa2 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java
@@ -26,9 +26,11 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 class AbstractJdbcCatalogTest {
 
     @Test
-    void testJdbcUrl() {
-        AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/");
-        AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234");
+    void testValidJdbcUrl() {
+        
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", "db");
+        
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", null);
+        AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/", 
"db");
+        AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234", 
"db");
     }
 
     @Test
@@ -36,7 +38,26 @@ class AbstractJdbcCatalogTest {
         assertThatThrownBy(
                         () ->
                                 AbstractJdbcCatalog.validateJdbcUrl(
-                                        "jdbc:dialect://localhost:1234/db"))
-                .isInstanceOf(IllegalArgumentException.class);
+                                        "jdbc:dialect://localhost:1234", null))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(AbstractJdbcCatalog.NO_DATABASES_HINT);
+        assertThatThrownBy(
+                        () ->
+                                AbstractJdbcCatalog.validateJdbcUrl(
+                                        "jdbc:dialect://localhost:1234/", 
null))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(AbstractJdbcCatalog.NO_DATABASES_HINT);
+        assertThatThrownBy(
+                        () ->
+                                AbstractJdbcCatalog.validateJdbcUrl(
+                                        "jdbc:dialect://localhost:1234/db", 
""))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(AbstractJdbcCatalog.DATABASE_NOT_UNIQUE_HINT);
+        assertThatThrownBy(
+                        () ->
+                                AbstractJdbcCatalog.validateJdbcUrl(
+                                        "jdbc:dialect://localhost:1234/db", 
"not_db"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(AbstractJdbcCatalog.DATABASE_NOT_UNIQUE_HINT);
     }
 }
diff --git 
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
 
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
index 288ac0e7..d0e727b9 100644
--- 
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
+++ 
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
@@ -135,7 +135,7 @@ public class CrateDBCatalog extends PostgresCatalog {
 
         String searchPath =
                 extractColumnValuesBySQL(
-                                getDatabaseUrl(DEFAULT_DATABASE), "show 
search_path", 1, null)
+                                urlFunction.apply(DEFAULT_DATABASE), "show 
search_path", 1, null)
                         .get(0);
         String[] schemas = searchPath.split("\\s*,\\s*");
 
diff --git 
a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CreateDBCatalogFactoryTest.java
 
b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CreateDBCatalogFactoryTest.java
new file mode 100644
index 00000000..5e2fc84e
--- /dev/null
+++ 
b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CreateDBCatalogFactoryTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.cratedb.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import 
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.cratedb.CrateDBTestBase;
+import org.apache.flink.connector.jdbc.cratedb.database.catalog.CrateDBCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for CrateDB database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests, 
verifying that {@link
+ * CrateDBCatalog} is correctly instantiated and configured.
+ */
+public class CreateDBCatalogFactoryTest implements CrateDBTestBase {
+
+    protected static final String TEST_CATALOG_NAME = "mycratedb";
+
+    @Test
+    void testFactoryWithBaseUrlAndDefaultDatabase() {
+        // Test case 1: base-url without database name + default-database = 
"mycratedb"
+        // jdbc:crate://localhost:49290/crate
+        String jdbcUrl = getMetadata().getJdbcUrl();
+        // jdbc:crate://localhost:49290/
+        String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        CrateDBCatalog.DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl,
+                        null);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(
+                JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
CrateDBCatalog.DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(CrateDBCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithFullUrlAndNullDefaultDatabase() {
+        // Test case 2: base-url with database name + default-database = null
+        // jdbc:crate://localhost:49290/crate
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        null,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(CrateDBCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithMatchingDefaultDatabase() {
+        // Test case 3: base-url with database name + default-database matches 
the database in URL
+        // jdbc:crate://localhost:49290/crate
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        CrateDBCatalog.DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(
+                JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
CrateDBCatalog.DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(CrateDBCatalog.class);
+    }
+}
diff --git 
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
 
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
index 27bdf93e..d693dbe2 100644
--- 
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
+++ 
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
@@ -118,7 +118,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         }
 
         return extractColumnValuesBySQL(
-                getDatabaseUrl(databaseName),
+                urlFunction.apply(databaseName),
                 "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE 
TABLE_SCHEMA = ?",
                 1,
                 null,
diff --git 
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlCatalogFactoryTest.java
 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlCatalogFactoryTest.java
new file mode 100644
index 00000000..84c1984f
--- /dev/null
+++ 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlCatalogFactoryTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.mysql.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import 
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.mysql.MySqlTestBase;
+import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for MySQL database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests, 
verifying that {@link
+ * MySqlCatalog} is correctly instantiated and configured.
+ */
+public class MySqlCatalogFactoryTest implements MySqlTestBase {
+
+    protected static final String TEST_CATALOG_NAME = "mysql_catalog";
+    protected static final String DEFAULT_DATABASE = "test";
+
+    @Test
+    void testFactoryWithBaseUrlAndDefaultDatabase() {
+        // Test case 1: base-url without database name + default-database =
+        // "oceanbase_mysql_catalog"
+        // jdbc:mysql://localhost:56336/test
+        String jdbcUrl = getMetadata().getJdbcUrl();
+        // jdbc:mysql://localhost:56336/
+        String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl,
+                        null);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(MySqlCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithFullUrlAndNullDefaultDatabase() {
+        // Test case 2: base-url with database name + default-database = null
+        // jdbc:mysql://localhost:56336/test
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        null,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(MySqlCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithMatchingDefaultDatabase() {
+        // Test case 3: base-url with database name + default-database matches 
the database in URL
+        // jdbc:mysql://localhost:56336/test
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(MySqlCatalog.class);
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseCatalogFactoryTest.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseCatalogFactoryTest.java
new file mode 100644
index 00000000..6198a4d7
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseCatalogFactoryTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import 
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase;
+import 
org.apache.flink.connector.jdbc.oceanbase.database.catalog.OceanBaseCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for OceanBase database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests, 
verifying that {@link
+ * OceanBaseCatalog} is correctly instantiated and configured.
+ */
+public class OceanBaseCatalogFactoryTest implements OceanBaseMysqlTestBase {
+    protected static final String TEST_CATALOG_NAME = 
"oceanbase_mysql_catalog";
+    protected static final String DEFAULT_DATABASE = "test";
+
+    @Test
+    void testFactoryWithBaseUrlAndDefaultDatabase() {
+        // Test case 1: base-url without database name + default-database =
+        // "oceanbase_mysql_catalog"
+        // 
jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false
+        String jdbcUrl = getMetadata().getJdbcUrl();
+        // jdbc:mysql://localhost:56336/
+        String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl,
+                        null);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(OceanBaseCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithFullUrlAndNullDefaultDatabase() {
+        // Test case 2: base-url with database name + default-database = null
+        // 
jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        null,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(OceanBaseCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithMatchingDefaultDatabase() {
+        // Test case 3: base-url with database name + default-database matches 
the database in URL
+        // 
jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog catalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(catalog).isInstanceOf(OceanBaseCatalog.class);
+    }
+}
diff --git 
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
index def48d64..4aff5907 100644
--- 
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
+++ 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
@@ -187,7 +187,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
             throw new DatabaseNotExistException(getName(), databaseName);
         }
 
-        final String url = getDatabaseUrl(databaseName);
+        final String url = urlFunction.apply(databaseName);
         try (Connection conn = DriverManager.getConnection(url, 
connectionProperties)) {
             // get all schemas
             List<String> schemas;
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresCatalogFactoryTest.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresCatalogFactoryTest.java
new file mode 100644
index 00000000..331e1b49
--- /dev/null
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresCatalogFactoryTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.postgres.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import 
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
+import 
org.apache.flink.connector.jdbc.postgres.database.catalog.PostgresCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test class for OceanBase database catalog factory.
+ *
+ * <p>This test follows the same pattern as other database factory tests, 
verifying that {@link
+ * PostgresCatalog} is correctly instantiated and configured.
+ */
+class PostgresCatalogFactoryTest implements PostgresTestBase {
+
+    private static final String TEST_CATALOG_NAME = "mypg";
+    private static final String DEFAULT_DATABASE = "test";
+
+    @Test
+    void testFactoryWithBaseUrlAndDefaultDatabase() {
+        // Test case 1: base-url without database name + default-database = 
"postgres"
+        // jdbc:postgresql://localhost:50807/postgres -> 
jdbc:postgresql://localhost:50807/
+        String jdbcUrl = getMetadata().getJdbcUrl();
+        String baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+        JdbcCatalog expectedCatalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        PostgresCatalog.DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl,
+                        null);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(
+                JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
PostgresCatalog.DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(expectedCatalog).isInstanceOf(PostgresCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithFullUrlAndNullDefaultDatabase() {
+        // Test case 2: base-url with database name + default-database = null
+        // jdbc:postgresql://localhost:53036/test?loggerLevel=OFF (full URL)
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog expectedCatalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        null,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(expectedCatalog).isInstanceOf(PostgresCatalog.class);
+    }
+
+    @Test
+    void testFactoryWithMatchingDefaultDatabase() {
+        // Test case 3: base-url with database name + default-database matches 
the database in URL
+        // jdbc:postgresql://localhost:53036/test?loggerLevel=OFF + 
default-database = "test"
+        String baseUrl = getMetadata().getJdbcUrl();
+
+        JdbcCatalog expectedCatalog =
+                JdbcFactoryLoader.loadCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        DEFAULT_DATABASE,
+                        getMetadata().getUsername(),
+                        getMetadata().getPassword(),
+                        baseUrl);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
+
+        final Catalog actualCatalog =
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+
+        
assertThat(actualCatalog).isEqualTo(expectedCatalog).isInstanceOf(PostgresCatalog.class);
+    }
+}
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java
deleted file mode 100644
index 1b3d2330..00000000
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.jdbc.postgres.database;
-
-import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
-import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
-import 
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory;
-import 
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
-import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
-import 
org.apache.flink.connector.jdbc.postgres.database.catalog.PostgresCatalog;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
-import org.apache.flink.table.factories.FactoryUtil;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link JdbcCatalogFactory}. */
-class PostgresFactoryTest implements PostgresTestBase {
-
-    protected static String baseUrl;
-    protected static JdbcCatalog catalog;
-
-    protected static final String TEST_CATALOG_NAME = "mypg";
-
-    @BeforeEach
-    void setup() {
-        // jdbc:postgresql://localhost:50807/postgres?user=postgres
-        String jdbcUrl = getMetadata().getJdbcUrl();
-        // jdbc:postgresql://localhost:50807/
-        baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
-
-        catalog =
-                JdbcFactoryLoader.loadCatalog(
-                        Thread.currentThread().getContextClassLoader(),
-                        TEST_CATALOG_NAME,
-                        PostgresCatalog.DEFAULT_DATABASE,
-                        getMetadata().getUsername(),
-                        getMetadata().getPassword(),
-                        baseUrl,
-                        null);
-    }
-
-    @Test
-    void test() {
-        final Map<String, String> options = new HashMap<>();
-        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
-        options.put(
-                JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
PostgresCatalog.DEFAULT_DATABASE);
-        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), 
getMetadata().getUsername());
-        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), 
getMetadata().getPassword());
-        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
-
-        final Catalog actualCatalog =
-                FactoryUtil.createCatalog(
-                        TEST_CATALOG_NAME,
-                        options,
-                        null,
-                        Thread.currentThread().getContextClassLoader());
-
-        checkEquals(catalog, (JdbcCatalog) actualCatalog);
-
-        assertThat((JdbcCatalog) 
actualCatalog).isInstanceOf(PostgresCatalog.class);
-    }
-
-    private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) {
-        assertThat(c2).isEqualTo(c1);
-    }
-}

Reply via email to