This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dc509aa080 [Improve] Remove catalog tag for config file (#5645)
dc509aa080 is described below
commit dc509aa080a4c89b45534a25e0ccae43115f9faf
Author: hailin0 <[email protected]>
AuthorDate: Thu Oct 19 14:54:09 2023 +0800
[Improve] Remove catalog tag for config file (#5645)
---
.../api/table/catalog/CatalogOptions.java | 2 +
.../api/table/catalog/CatalogTableUtil.java | 47 +-------------
.../api/table/catalog/CatalogTableUtilTest.java | 6 +-
.../mongodb/MongodbIncrementalSourceFactory.java | 2 +-
.../source/MySqlIncrementalSourceFactory.java | 2 +-
.../source/SqlServerIncrementalSourceFactory.java | 2 +-
.../catalog/sqlserver/SqlServerCatalogFactory.java | 7 --
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 75 +++++++++-------------
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 27 +++++---
.../seatunnel/jdbc/source/JdbcSourceFactory.java | 2 +-
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 55 ++++++++++++++++
.../engine/core/parse/ConfigParserUtil.java | 16 -----
12 files changed, 116 insertions(+), 127 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
index 326fd2fac0..2d1a3bc41b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
public interface CatalogOptions {
+
+ @Deprecated
Option<Map<String, String>> CATALOG_OPTIONS =
Options.key("catalog")
.mapType()
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index b258faaf4d..1ff83cb9a3 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -68,47 +68,6 @@ public class CatalogTableUtil implements Serializable {
"It is converted from RowType and only has column
information.");
}
- // TODO remove this method after
https://github.com/apache/seatunnel/issues/5483 done.
- @Deprecated
- public static List<CatalogTable> getCatalogTables(Config config,
ClassLoader classLoader) {
- // Highest priority: specified schema
- if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
- CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(config);
- return Collections.singletonList(catalogTable);
- }
-
- ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
- Map<String, String> catalogOptions =
-
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new
HashMap<>());
-
- Map<String, Object> catalogAllOptions = new HashMap<>();
- catalogAllOptions.putAll(readonlyConfig.toMap());
- catalogAllOptions.putAll(catalogOptions);
- ReadonlyConfig catalogConfig =
ReadonlyConfig.fromMap(catalogAllOptions);
-
- Optional<Catalog> optionalCatalog =
- FactoryUtil.createOptionalCatalog(
- catalogConfig.get(CatalogOptions.NAME),
- catalogConfig,
- classLoader,
- catalogConfig.get(CommonOptions.FACTORY_ID));
- return optionalCatalog
- .map(
- c -> {
- long startTime = System.currentTimeMillis();
- try (Catalog catalog = c) {
- catalog.open();
- List<CatalogTable> catalogTables =
catalog.getTables(catalogConfig);
- log.info(
- String.format(
- "Get catalog tables, cost
time: %d ms",
- System.currentTimeMillis() -
startTime));
- return catalogTables;
- }
- })
- .orElse(Collections.emptyList());
- }
-
/**
* Get catalog table from config, if schema is specified, return a catalog
table with specified
* schema, otherwise, return a catalog table with schema from catalog.
@@ -120,16 +79,16 @@ public class CatalogTableUtil implements Serializable {
* </a>
*/
@Deprecated
- public static List<CatalogTable> getCatalogTablesFromConfig(
+ public static List<CatalogTable> getCatalogTables(
ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
// We use plugin_name as factoryId, so MySQL-CDC should be MySQL
String factoryId =
readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
- return getCatalogTablesFromConfig(factoryId, readonlyConfig,
classLoader);
+ return getCatalogTables(factoryId, readonlyConfig, classLoader);
}
@Deprecated
- public static List<CatalogTable> getCatalogTablesFromConfig(
+ public static List<CatalogTable> getCatalogTables(
String factoryId, ReadonlyConfig readonlyConfig, ClassLoader
classLoader) {
// Highest priority: specified schema
Map<String, Object> schemaMap =
readonlyConfig.get(TableSchemaOptions.SCHEMA);
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
index 0149effab1..d3e84205c4 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
@@ -104,7 +104,7 @@ public class CatalogTableUtilTest {
Config source = config.getConfigList("source").get(0);
ReadonlyConfig sourceReadonlyConfig =
ReadonlyConfig.fromConfig(source);
List<CatalogTable> catalogTables =
- CatalogTableUtil.getCatalogTablesFromConfig(
+ CatalogTableUtil.getCatalogTables(
sourceReadonlyConfig,
Thread.currentThread().getContextClassLoader());
Assertions.assertEquals(2, catalogTables.size());
Assertions.assertEquals(
@@ -121,7 +121,7 @@ public class CatalogTableUtilTest {
Assertions.assertThrows(
SeaTunnelException.class,
() ->
- CatalogTableUtil.getCatalogTablesFromConfig(
+ CatalogTableUtil.getCatalogTables(
emptyReadonlyConfig,
Thread.currentThread().getContextClassLoader()));
// test unknown catalog
@@ -132,7 +132,7 @@ public class CatalogTableUtilTest {
Assertions.assertThrows(
SeaTunnelException.class,
() ->
- CatalogTableUtil.getCatalogTablesFromConfig(
+ CatalogTableUtil.getCatalogTables(
cannotFindCatalogReadonlyConfig,
Thread.currentThread().getContextClassLoader()));
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index 7b816ed3eb..761d36f8a2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -74,7 +74,7 @@ public class MongodbIncrementalSourceFactory implements
TableSourceFactory {
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
List<CatalogTable> catalogTables =
- CatalogTableUtil.getCatalogTablesFromConfig(
+ CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 60e1105e30..90e76e835c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -94,7 +94,7 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory {
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
List<CatalogTable> catalogTables =
- CatalogTableUtil.getCatalogTablesFromConfig(
+ CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 7127209aef..41623937af 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -99,7 +99,7 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
List<CatalogTable> catalogTables =
- CatalogTableUtil.getCatalogTablesFromConfig(
+ CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToDataType(catalogTables);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
index 9ddd035b2a..43bb75dce5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.configuration.util.OptionValidationException;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -28,8 +27,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
import com.google.auto.service.AutoService;
-import java.util.Optional;
-
@AutoService(Factory.class)
public class SqlServerCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = "SqlServer";
@@ -43,10 +40,6 @@ public class SqlServerCatalogFactory implements
CatalogFactory {
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
String url = options.get(JdbcCatalogOptions.BASE_URL);
JdbcUrlUtil.UrlInfo urlInfo = SqlServerURLParser.parse(url);
- Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
- if (!defaultDatabase.isPresent()) {
- throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
- }
return new SqlServerCatalog(
catalogName,
options.get(JdbcCatalogOptions.USERNAME),
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index e2eb971105..8507b51330 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -31,10 +30,9 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -48,6 +46,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialecten
import
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
import org.apache.commons.lang3.StringUtils;
@@ -55,13 +54,10 @@ import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
-import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
@AutoService(SeaTunnelSink.class)
public class JdbcSink
@@ -199,44 +195,37 @@ public class JdbcSink
@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
- Map<String, String> catalogOptions =
config.get(CatalogOptions.CATALOG_OPTIONS);
- if (catalogOptions != null) {
- String factoryId =
catalogOptions.get(CommonOptions.FACTORY_ID.key());
- if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
- return;
- }
- CatalogFactory catalogFactory =
- discoverFactory(
- Thread.currentThread().getContextClassLoader(),
- CatalogFactory.class,
- factoryId);
- if (catalogFactory != null) {
- try (Catalog catalog =
- catalogFactory.createCatalog(
- catalogFactory.factoryIdentifier(),
- ReadonlyConfig.fromMap(new
HashMap<>(catalogOptions)))) {
- catalog.open();
- FieldIdeEnum fieldIdeEnumEnum =
config.get(JdbcOptions.FIELD_IDE);
- String fieldIde =
- fieldIdeEnumEnum == null
- ? FieldIdeEnum.ORIGINAL.getValue()
- : fieldIdeEnumEnum.getValue();
- TablePath tablePath =
- TablePath.of(
- jdbcSinkConfig.getDatabase()
- + "."
- +
CatalogUtils.quoteTableIdentifier(
-
jdbcSinkConfig.getTable(), fieldIde));
- if
(!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
- catalog.createDatabase(tablePath, true);
- }
- catalogTable.getOptions().put("fieldIde", fieldIde);
- if (!catalog.tableExists(tablePath)) {
- catalog.createTable(tablePath, catalogTable, true);
- }
- } catch (Exception e) {
- throw new
JdbcConnectorException(HANDLE_SAVE_MODE_FAILED, e);
+ if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
+ return;
+ }
+ Optional<Catalog> catalogOptional =
+
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
+ if (catalogOptional.isPresent()) {
+ try (Catalog catalog = catalogOptional.get()) {
+ catalog.open();
+ FieldIdeEnum fieldIdeEnumEnum =
config.get(JdbcOptions.FIELD_IDE);
+ String fieldIde =
+ fieldIdeEnumEnum == null
+ ? FieldIdeEnum.ORIGINAL.getValue()
+ : fieldIdeEnumEnum.getValue();
+ TablePath tablePath =
+ TablePath.of(
+ jdbcSinkConfig.getDatabase()
+ + "."
+ +
CatalogUtils.quoteTableIdentifier(
+ jdbcSinkConfig.getTable(),
fieldIde));
+ if (!catalog.databaseExists(jdbcSinkConfig.getDatabase()))
{
+ catalog.createDatabase(tablePath, true);
+ }
+ catalogTable.getOptions().put("fieldIde", fieldIde);
+ if (!catalog.tableExists(tablePath)) {
+ catalog.createTable(tablePath, catalogTable, true);
}
+ } catch (UnsupportedOperationException | CatalogException e) {
+ // TODO Temporary fix, this feature has been changed in
this pr
+ // https://github.com/apache/seatunnel/pull/5645
+ } catch (Exception e) {
+ throw new JdbcConnectorException(HANDLE_SAVE_MODE_FAILED,
e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 6c63370592..540094571d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -73,16 +73,26 @@ public class JdbcSinkFactory implements TableSinkFactory {
return "Jdbc";
}
+ private ReadonlyConfig getCatalogOptions(TableSinkFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ // TODO Remove obsolete code
+ Optional<Map<String, String>> catalogOptions =
+ config.getOptional(CatalogOptions.CATALOG_OPTIONS);
+ if (catalogOptions.isPresent()) {
+ return ReadonlyConfig.fromMap(new HashMap<>(catalogOptions.get()));
+ }
+ return config;
+ }
+
@Override
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
- Map<String, String> catalogOptions =
config.get(CatalogOptions.CATALOG_OPTIONS);
+ ReadonlyConfig catalogOptions = getCatalogOptions(context);
Optional<String> optionalTable = config.getOptional(TABLE);
if (!optionalTable.isPresent()) {
- catalogOptions = catalogOptions == null ? new HashMap<>() :
catalogOptions;
- String prefix =
catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX.key());
- String suffix =
catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX.key());
+ String prefix =
catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
+ String suffix =
catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
if (StringUtils.isNotEmpty(prefix) ||
StringUtils.isNotEmpty(suffix)) {
TableIdentifier tableId = catalogTable.getTableId();
String tableName =
@@ -106,12 +116,9 @@ public class JdbcSinkFactory implements TableSinkFactory {
catalogTable.getCatalogName());
}
Map<String, String> map = config.toMap();
- if
(StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA.key()))) {
- map.put(
- TABLE.key(),
- catalogOptions.get(JdbcCatalogOptions.SCHEMA.key())
- + "."
- + catalogTable.getTableId().getTableName());
+ String schema = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
+ if (StringUtils.isNotBlank(schema)) {
+ map.put(TABLE.key(), schema + "." +
catalogTable.getTableId().getTableName());
} else if
(StringUtils.isNotBlank(catalogTable.getTableId().getSchemaName())) {
map.put(
TABLE.key(),
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 4fc5f6b189..a76cdffac9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -100,7 +100,7 @@ public class JdbcSourceFactory implements
TableSourceFactory {
Optional<PartitionParameter> partitionParameter = Optional.empty();
try {
CatalogTable catalogTable =
- CatalogTableUtil.getCatalogTablesFromConfig(
+ CatalogTableUtil.getCatalogTables(
dialect.dialectName(),
context.getOptions(),
context.getClassLoader())
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
new file mode 100644
index 0000000000..3f19d76d01
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+public class JdbcCatalogUtils {
+ private static final String DEFAULT_CATALOG = "default";
+
+ private static ReadonlyConfig extractCatalogConfig(JdbcConnectionConfig
config) {
+ Map<String, Object> catalogConfig = new HashMap<>();
+ catalogConfig.put(JdbcCatalogOptions.BASE_URL.key(), config.getUrl());
+ config.getUsername()
+ .ifPresent(val ->
catalogConfig.put(JdbcCatalogOptions.USERNAME.key(), val));
+ config.getPassword()
+ .ifPresent(val ->
catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
+ return ReadonlyConfig.fromMap(catalogConfig);
+ }
+
+ public static Optional<Catalog> findCatalog(JdbcConnectionConfig config,
JdbcDialect dialect) {
+ ReadonlyConfig catalogConfig = extractCatalogConfig(config);
+ return FactoryUtil.createOptionalCatalog(
+ dialect.dialectName(),
+ catalogConfig,
+ JdbcCatalogUtils.class.getClassLoader(),
+ dialect.dialectName());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
index fe1185c960..1bb0b5c57f 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionValidationException;
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
-import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -38,7 +36,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static org.apache.seatunnel.api.common.CommonOptions.FACTORY_ID;
@@ -61,22 +58,9 @@ public final class ConfigParserUtil {
FactoryUtil.getFactoryUrl(
FactoryUtil.discoverFactory(classLoader, factoryClass,
factoryId));
factoryUrls.add(factoryUrl);
- getCatalogFactoryUrl(readonlyConfig,
classLoader).ifPresent(factoryUrls::add);
return factoryUrls;
}
- private static Optional<URL> getCatalogFactoryUrl(
- ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
- Map<String, String> catalogOptions =
-
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new
HashMap<>());
- // TODO: fallback key
- String factoryId =
- catalogOptions.getOrDefault(FACTORY_ID.key(),
readonlyConfig.get(PLUGIN_NAME));
- Optional<CatalogFactory> optionalFactory =
- FactoryUtil.discoverOptionalFactory(classLoader,
CatalogFactory.class, factoryId);
- return optionalFactory.map(FactoryUtil::getFactoryUrl);
- }
-
public static void checkGraph(
List<? extends Config> sources,
List<? extends Config> transforms,