This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new ddd8f808b [improve][jdbc] Reduce jdbc options configuration (#4218)
ddd8f808b is described below
commit ddd8f808b561a11de3ef307f4b1a2fb9d93385cd
Author: Zongwen Li <[email protected]>
AuthorDate: Sat Feb 25 15:09:43 2023 +0800
[improve][jdbc] Reduce jdbc options configuration (#4218)
---
docs/en/connector-v2/source/MySQL-CDC.md | 21 ++----
.../util/OptionValidationException.java | 8 +++
.../apache/seatunnel/common/utils/JdbcUrlUtil.java | 75 ++++++++++++++++++++++
.../seatunnel/common/utils/JdbcUrlUtilTest.java | 50 +++++++++++++++
.../cdc/mysql/source/MySqlIncrementalSource.java | 11 ++--
.../source/MySqlIncrementalSourceFactory.java | 2 -
.../jdbc/catalog/AbstractJdbcCatalog.java | 62 ++----------------
.../seatunnel/jdbc/catalog/JdbcCatalogOptions.java | 4 +-
.../seatunnel/jdbc/catalog/MySqlCatalog.java | 8 +--
.../jdbc/catalog/MySqlCatalogFactory.java | 14 +++-
.../jdbc/catalog/AbstractJdbcCatalogTest.java | 48 --------------
.../core/parse/MultipleTableJobConfigParser.java | 1 -
12 files changed, 165 insertions(+), 139 deletions(-)
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md
b/docs/en/connector-v2/source/MySQL-CDC.md
index 3bccb2aec..43cbca07d 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -20,11 +20,9 @@ describes how to setup the MySQL CDC connector to run SQL
queries against MySQL
| name | type | required |
default value |
|------------------------------------------------|----------|----------|---------------|
-| hostname | String | Yes | -
|
-| port | Integer | No | 3306
|
| username | String | Yes | -
|
| password | String | Yes | -
|
-| database-names | List | Yes | -
|
+| database-names | List | No | -
|
| table-names | List | Yes | -
|
| base-url | String | Yes | -
|
| startup.mode | Enum | No |
INITIAL |
@@ -48,14 +46,6 @@ describes how to setup the MySQL CDC connector to run SQL
queries against MySQL
| debezium.* | config | No | -
|
| common-options | | no | -
|
-### hostname [String]
-
-IP address or hostname of the database server.
-
-### port [Integer]
-
-Integer port number of the database server.
-
### username [String]
Name of the database to use when connecting to the database server.
@@ -70,12 +60,11 @@ Database name of the database to monitor.
### table-names [List]
-Table name of the database to monitor.
+Table name of the database to monitor. The table name needs to include the
database name, for example: database_name.table_name
### base-url [String]
-URL has to be without database, like "jdbc:mysql://localhost:5432/" or
"jdbc:mariadb://localhost:5432" rather than "
-jdbc:polardb://localhost:5432/db"
+URL has to be with database, like "jdbc:mysql://localhost:5432/db" or
"jdbc:mysql://localhost:5432/db?useSSL=true".
### startup.mode [Enum]
@@ -192,12 +181,10 @@ source {
result_table_name = "fake"
parallelism = 1
server-id = 5656
- port = 56725
- hostname = "127.0.0.1"
username = "mysqluser"
password = "mysqlpw"
table-names = ["inventory_vwyw0n.products"]
- base-url = "jdbc:mysql://localhost:56725"
+ base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
}
}
```
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
index cc2422e09..1ea58af67 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.configuration.util;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
/**
@@ -36,4 +37,11 @@ public class OptionValidationException extends
SeaTunnelRuntimeException {
public OptionValidationException(String formatMessage, Object... args) {
super(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format(formatMessage, args));
}
+
+ public OptionValidationException(Option<?> option) {
+ super(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
+ String.format("The option(\"%s\") is incorrectly configured,
please refer to the doc: %s",
+ option.key(),
+ option.getDescription()));
+ }
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JdbcUrlUtil.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JdbcUrlUtil.java
new file mode 100644
index 000000000..9e2251632
--- /dev/null
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JdbcUrlUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public final class JdbcUrlUtil {
+ private static final Pattern URL_PATTERN =
Pattern.compile("^(?<url>jdbc:.+?//(?<host>.+?):(?<port>\\d+?))(/(?<database>.*?))*(\\?.*)*$");
+
+ private JdbcUrlUtil() {}
+
+ public static JdbcUrlUtil.UrlInfo getUrlInfo(String url) {
+ Matcher matcher = URL_PATTERN.matcher(url);
+ if (matcher.find()) {
+ String urlWithoutDatabase = matcher.group("url");
+ String database = matcher.group("database");
+ return new JdbcUrlUtil.UrlInfo(
+ urlWithoutDatabase,
+ matcher.group("host"),
+ Integer.valueOf(matcher.group("port")),
+ database
+ );
+ }
+ throw new IllegalArgumentException("The jdbc url format is incorrect:
" + url);
+ }
+
+ @Data
+ public static class UrlInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String urlWithoutDatabase;
+ private final String host;
+ private final Integer port;
+ private final String defaultDatabase;
+
+ public UrlInfo(String urlWithoutDatabase,
+ String host,
+ Integer port,
+ String defaultDatabase) {
+ this.urlWithoutDatabase = urlWithoutDatabase;
+ this.host = host;
+ this.port = port;
+ this.defaultDatabase = defaultDatabase;
+ }
+
+ public Optional<String> getUrlWithDatabase() {
+ return StringUtils.isBlank(defaultDatabase) ? Optional.empty() :
+ Optional.of(urlWithoutDatabase + "/" + defaultDatabase);
+ }
+
+ public Optional<String> getDefaultDatabase() {
+ return StringUtils.isBlank(defaultDatabase) ? Optional.empty() :
Optional.of(defaultDatabase);
+ }
+ }
+}
diff --git
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/JdbcUrlUtilTest.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/JdbcUrlUtilTest.java
new file mode 100644
index 000000000..952189b1a
--- /dev/null
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/JdbcUrlUtilTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+public class JdbcUrlUtilTest {
+
+ @Test
+ public void testMySQLUrlWithDatabase() {
+ JdbcUrlUtil.UrlInfo urlInfo =
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310/seatunnel?useSSL=true");
+ Assertions.assertTrue(urlInfo.getUrlWithDatabase().isPresent());
+ Assertions.assertTrue(urlInfo.getDefaultDatabase().isPresent());
+ Assertions.assertEquals("seatunnel",
urlInfo.getDefaultDatabase().get());
+ Assertions.assertEquals("jdbc:mysql://192.168.1.1:5310/seatunnel",
urlInfo.getUrlWithDatabase().get());
+ Assertions.assertEquals("jdbc:mysql://192.168.1.1:5310",
urlInfo.getUrlWithoutDatabase());
+ Assertions.assertEquals("192.168.1.1", urlInfo.getHost());
+ Assertions.assertEquals(5310, urlInfo.getPort());
+ Assertions.assertEquals(urlInfo,
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310/seatunnel"));
+ }
+
+ @Test
+ public void testMySQLUrlWithoutDatabase() {
+ JdbcUrlUtil.UrlInfo urlInfo =
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310/");
+ Assertions.assertFalse(urlInfo.getUrlWithDatabase().isPresent());
+ Assertions.assertFalse(urlInfo.getDefaultDatabase().isPresent());
+ Assertions.assertEquals("jdbc:mysql://192.168.1.1:5310",
urlInfo.getUrlWithoutDatabase());
+ Assertions.assertEquals("192.168.1.1", urlInfo.getHost());
+ Assertions.assertEquals(5310, urlInfo.getPort());
+ Assertions.assertEquals(urlInfo,
JdbcUrlUtil.getUrlInfo("jdbc:mysql://192.168.1.1:5310"));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 43734efbd..81d6581e4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -20,11 +20,13 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
@@ -37,7 +39,7 @@ import
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDese
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalogFactory;
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
@@ -63,6 +65,9 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
MySqlSourceConfigFactory configFactory = new
MySqlSourceConfigFactory();
configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
configFactory.fromReadonlyConfig(readonlyConfig);
+ JdbcUrlUtil.UrlInfo urlInfo =
JdbcUrlUtil.getUrlInfo(config.get(JdbcCatalogOptions.BASE_URL));
+ configFactory.hostname(urlInfo.getHost());
+ configFactory.port(urlInfo.getPort());
configFactory.startupOptions(startupConfig);
configFactory.stopOptions(stopConfig);
return configFactory;
@@ -71,12 +76,10 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
@SuppressWarnings("unchecked")
@Override
public DebeziumDeserializationSchema<T>
createDebeziumDeserializationSchema(ReadonlyConfig config) {
- JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
- String baseUrl = config.get(JdbcCatalogOptions.BASE_URL);
SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
// TODO: support metadata keys
- MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql",
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(),
jdbcSourceConfig.getPassword(), baseUrl);
+ Catalog mySqlCatalog = new
MySqlCatalogFactory().createCatalog("mysql", config);
CatalogTable table =
mySqlCatalog.getTable(TablePath.of(config.get(CatalogOptions.TABLE_NAMES).get(0)));
physicalRowType = table.getTableSchema().toPhysicalRowDataType();
} else {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index e22148f81..3ee38ab2f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -52,14 +52,12 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
public OptionRule optionRule() {
return JdbcSourceOptions.getBaseRule()
.required(
- JdbcSourceOptions.HOSTNAME,
JdbcSourceOptions.USERNAME,
JdbcSourceOptions.PASSWORD,
CatalogOptions.TABLE_NAMES,
JdbcCatalogOptions.BASE_URL)
.optional(
JdbcSourceOptions.DATABASE_NAMES,
- JdbcSourceOptions.PORT,
JdbcSourceOptions.SERVER_ID,
JdbcSourceOptions.SERVER_TIME_ZONE,
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index e6f64d8f3..b5dbbce45 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -63,76 +63,22 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
public AbstractJdbcCatalog(
String catalogName,
- String defaultDatabase,
String username,
String pwd,
- String baseUrl) {
+ String defaultDatabase,
+ String baseUrl,
+ String defaultUrl) {
checkArgument(StringUtils.isNotBlank(username));
checkArgument(StringUtils.isNotBlank(pwd));
+ checkArgument(StringUtils.isNotBlank(defaultDatabase));
checkArgument(StringUtils.isNotBlank(baseUrl));
-
- baseUrl = baseUrl.trim();
- validateJdbcUrlWithoutDatabase(baseUrl);
this.catalogName = catalogName;
this.defaultDatabase = defaultDatabase;
this.username = username;
this.pwd = pwd;
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
- this.defaultUrl = this.baseUrl + defaultDatabase;
- }
-
- /**
- * URL has to be without database, like "jdbc:mysql://localhost:5432/" or
- * "jdbc:mysql://localhost:5432" rather than
"jdbc:mysql://localhost:5432/db".
- */
- public static void validateJdbcUrlWithoutDatabase(String url) {
- String[] parts = url.trim().split("\\/+");
-
- checkArgument(parts.length == 2);
- }
-
- public AbstractJdbcCatalog(
- String catalogName,
- String username,
- String pwd,
- String defaultUrl) {
-
- checkArgument(StringUtils.isNotBlank(username));
- checkArgument(StringUtils.isNotBlank(pwd));
- checkArgument(StringUtils.isNotBlank(defaultUrl));
-
- defaultUrl = defaultUrl.trim();
- validateJdbcUrlWithDatabase(defaultUrl);
- this.catalogName = catalogName;
- this.username = username;
- this.pwd = pwd;
this.defaultUrl = defaultUrl;
- String[] strings = splitDefaultUrl(defaultUrl);
- this.baseUrl = strings[0];
- this.defaultDatabase = strings[1];
- }
-
- /**
- * URL has to be with database, like "jdbc:mysql://localhost:5432/db"
rather than "jdbc:mysql://localhost:5432/".
- */
- @SuppressWarnings("MagicNumber")
- public static void validateJdbcUrlWithDatabase(String url) {
- String[] parts = url.trim().split("\\/+");
- checkArgument(parts.length == 3);
- }
-
- /**
- * Ensure that the url was validated {@link #validateJdbcUrlWithDatabase}.
- *
- * @return The array size is fixed at 2, index 0 is base url, and index 1
is default database.
- */
- public static String[] splitDefaultUrl(String defaultUrl) {
- String[] res = new String[2];
- int index = defaultUrl.lastIndexOf("/") + 1;
- res[0] = defaultUrl.substring(0, index);
- res[1] = defaultUrl.substring(index, defaultUrl.length());
- return res;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
index 9fc39d054..08d72f3fe 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -25,8 +25,8 @@ public interface JdbcCatalogOptions {
Option<String> BASE_URL = Options.key("base-url")
.stringType()
.noDefaultValue()
- .withDescription("URL has to be without database, like
\"jdbc:mysql://localhost:5432/\" or" +
- "\"jdbc:mysql://localhost:5432\" rather than
\"jdbc:mysql://localhost:5432/db\"");
+ .withDescription("URL has to be with database, like
\"jdbc:mysql://localhost:5432/db\" or" +
+ "\"jdbc:mysql://localhost:5432/db?useSSL=true\".");
Option<String> USERNAME =
Options.key("username")
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 429de5683..c8ac92f6e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -61,12 +61,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
SYS_DATABASES.add("sys");
}
- public MySqlCatalog(String catalogName, String defaultDatabase, String
username, String pwd, String baseUrl) {
- super(catalogName, defaultDatabase, username, pwd, baseUrl);
- }
-
- public MySqlCatalog(String catalogName, String username, String pwd,
String defaultUrl) {
- super(catalogName, username, pwd, defaultUrl);
+ public MySqlCatalog(String catalogName, String username, String pwd,
String defaultDatabase, String baseUrl, String defaultUrl) {
+ super(catalogName, username, pwd, defaultDatabase, baseUrl,
defaultUrl);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
index f3246ab6c..1dcf02382 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
@@ -19,12 +19,16 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
@AutoService(Factory.class)
public class MySqlCatalogFactory implements CatalogFactory {
@@ -35,10 +39,18 @@ public class MySqlCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+ JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+ Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+ if (!defaultDatabase.isPresent()) {
+ throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+ }
return new MySqlCatalog(catalogName,
options.get(JdbcCatalogOptions.USERNAME),
options.get(JdbcCatalogOptions.PASSWORD),
- options.get(JdbcCatalogOptions.BASE_URL));
+ defaultDatabase.get(),
+ urlInfo.getUrlWithoutDatabase(),
+ urlInfo.getUrlWithDatabase().get());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
deleted file mode 100644
index 251c24d21..000000000
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-public class AbstractJdbcCatalogTest {
-
- @ParameterizedTest
- @ValueSource(strings = {"jdbc:mysql://localhost:5432/",
"jdbc:mysql://localhost:5432", "jdbc:mysql://localhost:5432/",
"jdbc:postgresql://localhost:5432"})
- public void testValidateJdbcBaseUrl(String baseUrl) {
- Assertions.assertDoesNotThrow(() ->
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(baseUrl));
- Assertions.assertThrowsExactly(IllegalArgumentException.class, () ->
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(baseUrl));
- }
-
- @ParameterizedTest
- @ValueSource(strings = {"jdbc:mysql://localhost:5432/db",
"jdbc:postgresql://localhost:5432/db"})
- public void testValidateJdbcDefault(String defaultUrl) {
- Assertions.assertDoesNotThrow(() ->
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(defaultUrl));
- Assertions.assertThrowsExactly(IllegalArgumentException.class, () ->
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(defaultUrl));
- }
-
- @ParameterizedTest
- @CsvSource({"jdbc:mysql://localhost:5432/db, jdbc:mysql://localhost:5432/,
db",
- "jdbc:postgresql://localhost:5432/db,
jdbc:postgresql://localhost:5432/, db"})
- public void testSplitDefaultUrl(String defaultUrl, String expectedUrl,
String expectedDatabase) {
- Assertions.assertArrayEquals(new String[] {expectedUrl,
expectedDatabase}, AbstractJdbcCatalog.splitDefaultUrl(defaultUrl));
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 312677112..ba5bb6028 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -30,7 +30,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;