This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new ddd8f808b [improve][jdbc] Reduce jdbc options configuration (#4218)
ddd8f808b is described below

commit ddd8f808b561a11de3ef307f4b1a2fb9d93385cd
Author: Zongwen Li <[email protected]>
AuthorDate: Sat Feb 25 15:09:43 2023 +0800

    [improve][jdbc] Reduce jdbc options configuration (#4218)
---
 docs/en/connector-v2/source/MySQL-CDC.md           | 21 ++----
 .../util/OptionValidationException.java            |  8 +++
 .../apache/seatunnel/common/utils/JdbcUrlUtil.java | 75 ++++++++++++++++++++++
 .../seatunnel/common/utils/JdbcUrlUtilTest.java    | 50 +++++++++++++++
 .../cdc/mysql/source/MySqlIncrementalSource.java   | 11 ++--
 .../source/MySqlIncrementalSourceFactory.java      |  2 -
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 62 ++----------------
 .../seatunnel/jdbc/catalog/JdbcCatalogOptions.java |  4 +-
 .../seatunnel/jdbc/catalog/MySqlCatalog.java       |  8 +--
 .../jdbc/catalog/MySqlCatalogFactory.java          | 14 +++-
 .../jdbc/catalog/AbstractJdbcCatalogTest.java      | 48 --------------
 .../core/parse/MultipleTableJobConfigParser.java   |  1 -
 12 files changed, 165 insertions(+), 139 deletions(-)

diff --git a/docs/en/connector-v2/source/MySQL-CDC.md 
b/docs/en/connector-v2/source/MySQL-CDC.md
index 3bccb2aec..43cbca07d 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -20,11 +20,9 @@ describes how to setup the MySQL CDC connector to run SQL 
queries against MySQL
 
 | name                                           | type     | required | 
default value |
 
|------------------------------------------------|----------|----------|---------------|
-| hostname                                       | String   | Yes      | -     
        |
-| port                                           | Integer  | No       | 3306  
        |
 | username                                       | String   | Yes      | -     
        |
 | password                                       | String   | Yes      | -     
        |
-| database-names                                 | List     | Yes      | -     
        |
+| database-names                                 | List     | No       | -     
        |
 | table-names                                    | List     | Yes      | -     
        |
 | base-url                                       | String   | Yes      | -     
        |
 | startup.mode                                   | Enum     | No       | 
INITIAL       |
@@ -48,14 +46,6 @@ describes how to setup the MySQL CDC connector to run SQL 
queries against MySQL
 | debezium.*                                     | config   | No       | -     
        |
 | common-options                                 |          | no       | -     
        |
 
-### hostname [String]
-
-IP address or hostname of the database server.
-
-### port [Integer]
-
-Integer port number of the database server.
-
 ### username [String]
 
 Name of the database to use when connecting to the database server.
@@ -70,12 +60,11 @@ Database name of the database to monitor.
 
 ### table-names [List]
 
-Table name of the database to monitor.
+Table name of the database to monitor. The table name needs to include the 
database name, for example: database_name.table_name
 
 ### base-url [String]
 
-URL has to be without database, like "jdbc:mysql://localhost:5432/" or 
"jdbc:mariadb://localhost:5432" rather than "
-jdbc:polardb://localhost:5432/db"
+URL has to be with database, like "jdbc:mysql://localhost:5432/db" or 
"jdbc:mysql://localhost:5432/db?useSSL=true".
 
 ### startup.mode [Enum]
 
@@ -192,12 +181,10 @@ source {
     result_table_name = "fake"
     parallelism = 1
     server-id = 5656
-    port = 56725
-    hostname = "127.0.0.1"
     username = "mysqluser"
     password = "mysqlpw"
     table-names = ["inventory_vwyw0n.products"]
-    base-url = "jdbc:mysql://localhost:56725"
+    base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
   }
 }
 ```
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
index cc2422e09..1ea58af67 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.configuration.util;
 
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 /**
@@ -36,4 +37,11 @@ public class OptionValidationException extends 
SeaTunnelRuntimeException {
     public OptionValidationException(String formatMessage, Object... args) {
         super(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, 
String.format(formatMessage, args));
     }
+
+    public OptionValidationException(Option<?> option) {
+        super(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
+            String.format("The option(\"%s\")  is incorrectly configured, 
please refer to the doc: %s",
+                option.key(),
+                option.getDescription()));
+    }
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JdbcUrlUtil.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JdbcUrlUtil.java
new file mode 100644
index 000000000..9e2251632
--- /dev/null
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JdbcUrlUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public final class JdbcUrlUtil {
+    private static final Pattern URL_PATTERN = 
Pattern.compile("^(?<url>jdbc:.+?//(?<host>.+?):(?<port>\\d+?))(/(?<database>.*?))*(\\?.*)*$");
+
+    private JdbcUrlUtil() {}
+
+    public static JdbcUrlUtil.UrlInfo getUrlInfo(String url) {
+        Matcher matcher = URL_PATTERN.matcher(url);
+        if (matcher.find()) {
+            String urlWithoutDatabase = matcher.group("url");
+            String database = matcher.group("database");
+            return new JdbcUrlUtil.UrlInfo(
+                    urlWithoutDatabase,
+                    matcher.group("host"),
+                    Integer.valueOf(matcher.group("port")),
+                    database
+            );
+        }
+        throw new IllegalArgumentException("The jdbc url format is incorrect: 
" + url);
+    }
+
+    @Data
+    public static class UrlInfo implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final String urlWithoutDatabase;
+        private final String host;
+        private final Integer port;
+        private final String defaultDatabase;
+
+        public UrlInfo(String urlWithoutDatabase,
+                       String host,
+                       Integer port,
+                       String defaultDatabase) {
+            this.urlWithoutDatabase = urlWithoutDatabase;
+            this.host = host;
+            this.port = port;
+            this.defaultDatabase = defaultDatabase;
+        }
+
+        public Optional<String> getUrlWithDatabase() {
+            return StringUtils.isBlank(defaultDatabase) ? Optional.empty() :
+                    Optional.of(urlWithoutDatabase + "/" + defaultDatabase);
+        }
+
+        public Optional<String> getDefaultDatabase() {
+            return StringUtils.isBlank(defaultDatabase) ? Optional.empty() : 
Optional.of(defaultDatabase);
+        }
+    }
+}
diff --git 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/JdbcUrlUtilTest.java
 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/JdbcUrlUtilTest.java
new file mode 100644
index 000000000..952189b1a
--- /dev/null
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/JdbcUrlUtilTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+public class JdbcUrlUtilTest {
+
+    @Test
+    public void testMySQLUrlWithDatabase() {
+        JdbcUrlUtil.UrlInfo urlInfo = 
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310/seatunnel?useSSL=true");
+        Assertions.assertTrue(urlInfo.getUrlWithDatabase().isPresent());
+        Assertions.assertTrue(urlInfo.getDefaultDatabase().isPresent());
+        Assertions.assertEquals("seatunnel", 
urlInfo.getDefaultDatabase().get());
+        Assertions.assertEquals("jdbc:mysql://192.168.1.1:5310/seatunnel", 
urlInfo.getUrlWithDatabase().get());
+        Assertions.assertEquals("jdbc:mysql://192.168.1.1:5310", 
urlInfo.getUrlWithoutDatabase());
+        Assertions.assertEquals("192.168.1.1", urlInfo.getHost());
+        Assertions.assertEquals(5310, urlInfo.getPort());
+        Assertions.assertEquals(urlInfo, 
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310/seatunnel"));
+    }
+
+    @Test
+    public void testMySQLUrlWithoutDatabase() {
+        JdbcUrlUtil.UrlInfo urlInfo = 
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310/");
+        Assertions.assertFalse(urlInfo.getUrlWithDatabase().isPresent());
+        Assertions.assertFalse(urlInfo.getDefaultDatabase().isPresent());
+        Assertions.assertEquals("jdbc:mysql://192.168.1.1:5310", 
urlInfo.getUrlWithoutDatabase());
+        Assertions.assertEquals("192.168.1.1", urlInfo.getHost());
+        Assertions.assertEquals(5310, urlInfo.getPort());
+        Assertions.assertEquals(urlInfo, 
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310"));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 43734efbd..81d6581e4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -20,11 +20,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
@@ -37,7 +39,7 @@ import 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDese
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalogFactory;
 
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
@@ -63,6 +65,9 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
         MySqlSourceConfigFactory configFactory = new 
MySqlSourceConfigFactory();
         configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
         configFactory.fromReadonlyConfig(readonlyConfig);
+        JdbcUrlUtil.UrlInfo urlInfo = 
JdbcUrlUtil.getUrlInfo(config.get(JdbcCatalogOptions.BASE_URL));
+        configFactory.hostname(urlInfo.getHost());
+        configFactory.port(urlInfo.getPort());
         configFactory.startupOptions(startupConfig);
         configFactory.stopOptions(stopConfig);
         return configFactory;
@@ -71,12 +76,10 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
     @SuppressWarnings("unchecked")
     @Override
     public DebeziumDeserializationSchema<T> 
createDebeziumDeserializationSchema(ReadonlyConfig config) {
-        JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
-        String baseUrl = config.get(JdbcCatalogOptions.BASE_URL);
         SeaTunnelDataType<SeaTunnelRow> physicalRowType;
         if (dataType == null) {
             // TODO: support metadata keys
-            MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", 
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), 
jdbcSourceConfig.getPassword(), baseUrl);
+            Catalog mySqlCatalog = new 
MySqlCatalogFactory().createCatalog("mysql", config);
             CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
             physicalRowType = table.getTableSchema().toPhysicalRowDataType();
         } else {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index e22148f81..3ee38ab2f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -52,14 +52,12 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
     public OptionRule optionRule() {
         return JdbcSourceOptions.getBaseRule()
             .required(
-                JdbcSourceOptions.HOSTNAME,
                 JdbcSourceOptions.USERNAME,
                 JdbcSourceOptions.PASSWORD,
                 CatalogOptions.TABLE_NAMES,
                 JdbcCatalogOptions.BASE_URL)
             .optional(
                 JdbcSourceOptions.DATABASE_NAMES,
-                JdbcSourceOptions.PORT,
                 JdbcSourceOptions.SERVER_ID,
                 JdbcSourceOptions.SERVER_TIME_ZONE,
                 JdbcSourceOptions.CONNECT_TIMEOUT_MS,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index e6f64d8f3..b5dbbce45 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -63,76 +63,22 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     public AbstractJdbcCatalog(
         String catalogName,
-        String defaultDatabase,
         String username,
         String pwd,
-        String baseUrl) {
+        String defaultDatabase,
+        String baseUrl,
+        String defaultUrl) {
 
         checkArgument(StringUtils.isNotBlank(username));
         checkArgument(StringUtils.isNotBlank(pwd));
+        checkArgument(StringUtils.isNotBlank(defaultDatabase));
         checkArgument(StringUtils.isNotBlank(baseUrl));
-
-        baseUrl = baseUrl.trim();
-        validateJdbcUrlWithoutDatabase(baseUrl);
         this.catalogName = catalogName;
         this.defaultDatabase = defaultDatabase;
         this.username = username;
         this.pwd = pwd;
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
-        this.defaultUrl = this.baseUrl + defaultDatabase;
-    }
-
-    /**
-     * URL has to be without database, like "jdbc:mysql://localhost:5432/" or
-     * "jdbc:mysql://localhost:5432" rather than 
"jdbc:mysql://localhost:5432/db".
-     */
-    public static void validateJdbcUrlWithoutDatabase(String url) {
-        String[] parts = url.trim().split("\\/+");
-
-        checkArgument(parts.length == 2);
-    }
-
-    public AbstractJdbcCatalog(
-        String catalogName,
-        String username,
-        String pwd,
-        String defaultUrl) {
-
-        checkArgument(StringUtils.isNotBlank(username));
-        checkArgument(StringUtils.isNotBlank(pwd));
-        checkArgument(StringUtils.isNotBlank(defaultUrl));
-
-        defaultUrl = defaultUrl.trim();
-        validateJdbcUrlWithDatabase(defaultUrl);
-        this.catalogName = catalogName;
-        this.username = username;
-        this.pwd = pwd;
         this.defaultUrl = defaultUrl;
-        String[] strings = splitDefaultUrl(defaultUrl);
-        this.baseUrl = strings[0];
-        this.defaultDatabase = strings[1];
-    }
-
-    /**
-     * URL has to be with database, like "jdbc:mysql://localhost:5432/db" 
rather than "jdbc:mysql://localhost:5432/".
-     */
-    @SuppressWarnings("MagicNumber")
-    public static void validateJdbcUrlWithDatabase(String url) {
-        String[] parts = url.trim().split("\\/+");
-        checkArgument(parts.length == 3);
-    }
-
-    /**
-     * Ensure that the url was validated {@link #validateJdbcUrlWithDatabase}.
-     *
-     * @return The array size is fixed at 2, index 0 is base url, and index 1 
is default database.
-     */
-    public static String[] splitDefaultUrl(String defaultUrl) {
-        String[] res = new String[2];
-        int index = defaultUrl.lastIndexOf("/")  + 1;
-        res[0] = defaultUrl.substring(0, index);
-        res[1] = defaultUrl.substring(index, defaultUrl.length());
-        return res;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
index 9fc39d054..08d72f3fe 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -25,8 +25,8 @@ public interface JdbcCatalogOptions {
     Option<String> BASE_URL = Options.key("base-url")
         .stringType()
         .noDefaultValue()
-        .withDescription("URL has to be without database, like 
\"jdbc:mysql://localhost:5432/\" or" +
-            "\"jdbc:mysql://localhost:5432\" rather than 
\"jdbc:mysql://localhost:5432/db\"");
+        .withDescription("URL has to be with database, like 
\"jdbc:mysql://localhost:5432/db\" or" +
+            "\"jdbc:mysql://localhost:5432/db?useSSL=true\".");
 
     Option<String> USERNAME =
         Options.key("username")
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 429de5683..c8ac92f6e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -61,12 +61,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         SYS_DATABASES.add("sys");
     }
 
-    public MySqlCatalog(String catalogName, String defaultDatabase, String 
username, String pwd, String baseUrl) {
-        super(catalogName, defaultDatabase, username, pwd, baseUrl);
-    }
-
-    public MySqlCatalog(String catalogName, String username, String pwd, 
String defaultUrl) {
-        super(catalogName, username, pwd, defaultUrl);
+    public MySqlCatalog(String catalogName, String username, String pwd, 
String defaultDatabase, String baseUrl, String defaultUrl) {
+        super(catalogName, username, pwd, defaultDatabase, baseUrl, 
defaultUrl);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
index f3246ab6c..1dcf02382 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
@@ -19,12 +19,16 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 @AutoService(Factory.class)
 public class MySqlCatalogFactory implements CatalogFactory {
 
@@ -35,10 +39,18 @@ public class MySqlCatalogFactory implements CatalogFactory {
 
     @Override
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+        Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+        if (!defaultDatabase.isPresent()) {
+            throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+        }
         return new MySqlCatalog(catalogName,
             options.get(JdbcCatalogOptions.USERNAME),
             options.get(JdbcCatalogOptions.PASSWORD),
-            options.get(JdbcCatalogOptions.BASE_URL));
+            defaultDatabase.get(),
+            urlInfo.getUrlWithoutDatabase(),
+            urlInfo.getUrlWithDatabase().get());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
deleted file mode 100644
index 251c24d21..000000000
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-public class AbstractJdbcCatalogTest {
-
-    @ParameterizedTest
-    @ValueSource(strings = {"jdbc:mysql://localhost:5432/", 
"jdbc:mysql://localhost:5432", "jdbc:mysql://localhost:5432/", 
"jdbc:postgresql://localhost:5432"})
-    public void testValidateJdbcBaseUrl(String baseUrl) {
-        Assertions.assertDoesNotThrow(() -> 
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(baseUrl));
-        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(baseUrl));
-    }
-
-    @ParameterizedTest
-    @ValueSource(strings = {"jdbc:mysql://localhost:5432/db", 
"jdbc:postgresql://localhost:5432/db"})
-    public void testValidateJdbcDefault(String defaultUrl) {
-        Assertions.assertDoesNotThrow(() -> 
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(defaultUrl));
-        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(defaultUrl));
-    }
-
-    @ParameterizedTest
-    @CsvSource({"jdbc:mysql://localhost:5432/db, jdbc:mysql://localhost:5432/, 
db",
-        "jdbc:postgresql://localhost:5432/db, 
jdbc:postgresql://localhost:5432/, db"})
-    public void testSplitDefaultUrl(String defaultUrl, String expectedUrl, 
String expectedDatabase) {
-        Assertions.assertArrayEquals(new String[] {expectedUrl, 
expectedDatabase}, AbstractJdbcCatalog.splitDefaultUrl(defaultUrl));
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 312677112..ba5bb6028 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -30,7 +30,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;

Reply via email to