This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc509aa080 [Improve] Remove catalog tag for config file (#5645)
dc509aa080 is described below

commit dc509aa080a4c89b45534a25e0ccae43115f9faf
Author: hailin0 <[email protected]>
AuthorDate: Thu Oct 19 14:54:09 2023 +0800

    [Improve] Remove catalog tag for config file (#5645)
---
 .../api/table/catalog/CatalogOptions.java          |  2 +
 .../api/table/catalog/CatalogTableUtil.java        | 47 +-------------
 .../api/table/catalog/CatalogTableUtilTest.java    |  6 +-
 .../mongodb/MongodbIncrementalSourceFactory.java   |  2 +-
 .../source/MySqlIncrementalSourceFactory.java      |  2 +-
 .../source/SqlServerIncrementalSourceFactory.java  |  2 +-
 .../catalog/sqlserver/SqlServerCatalogFactory.java |  7 --
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   | 75 +++++++++-------------
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       | 27 +++++---
 .../seatunnel/jdbc/source/JdbcSourceFactory.java   |  2 +-
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     | 55 ++++++++++++++++
 .../engine/core/parse/ConfigParserUtil.java        | 16 -----
 12 files changed, 116 insertions(+), 127 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
index 326fd2fac0..2d1a3bc41b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 
 public interface CatalogOptions {
+
+    @Deprecated
     Option<Map<String, String>> CATALOG_OPTIONS =
             Options.key("catalog")
                     .mapType()
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index b258faaf4d..1ff83cb9a3 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -68,47 +68,6 @@ public class CatalogTableUtil implements Serializable {
                 "It is converted from RowType and only has column 
information.");
     }
 
-    // TODO remove this method after 
https://github.com/apache/seatunnel/issues/5483 done.
-    @Deprecated
-    public static List<CatalogTable> getCatalogTables(Config config, 
ClassLoader classLoader) {
-        // Highest priority: specified schema
-        if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(config);
-            return Collections.singletonList(catalogTable);
-        }
-
-        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
-        Map<String, String> catalogOptions =
-                
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new 
HashMap<>());
-
-        Map<String, Object> catalogAllOptions = new HashMap<>();
-        catalogAllOptions.putAll(readonlyConfig.toMap());
-        catalogAllOptions.putAll(catalogOptions);
-        ReadonlyConfig catalogConfig = 
ReadonlyConfig.fromMap(catalogAllOptions);
-
-        Optional<Catalog> optionalCatalog =
-                FactoryUtil.createOptionalCatalog(
-                        catalogConfig.get(CatalogOptions.NAME),
-                        catalogConfig,
-                        classLoader,
-                        catalogConfig.get(CommonOptions.FACTORY_ID));
-        return optionalCatalog
-                .map(
-                        c -> {
-                            long startTime = System.currentTimeMillis();
-                            try (Catalog catalog = c) {
-                                catalog.open();
-                                List<CatalogTable> catalogTables = 
catalog.getTables(catalogConfig);
-                                log.info(
-                                        String.format(
-                                                "Get catalog tables, cost 
time: %d ms",
-                                                System.currentTimeMillis() - 
startTime));
-                                return catalogTables;
-                            }
-                        })
-                .orElse(Collections.emptyList());
-    }
-
     /**
      * Get catalog table from config, if schema is specified, return a catalog 
table with specified
      * schema, otherwise, return a catalog table with schema from catalog.
@@ -120,16 +79,16 @@ public class CatalogTableUtil implements Serializable {
      *     </a>
      */
     @Deprecated
-    public static List<CatalogTable> getCatalogTablesFromConfig(
+    public static List<CatalogTable> getCatalogTables(
             ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
 
         // We use plugin_name as factoryId, so MySQL-CDC should be MySQL
         String factoryId = 
readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
-        return getCatalogTablesFromConfig(factoryId, readonlyConfig, 
classLoader);
+        return getCatalogTables(factoryId, readonlyConfig, classLoader);
     }
 
     @Deprecated
-    public static List<CatalogTable> getCatalogTablesFromConfig(
+    public static List<CatalogTable> getCatalogTables(
             String factoryId, ReadonlyConfig readonlyConfig, ClassLoader 
classLoader) {
         // Highest priority: specified schema
         Map<String, Object> schemaMap = 
readonlyConfig.get(TableSchemaOptions.SCHEMA);
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
index 0149effab1..d3e84205c4 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
@@ -104,7 +104,7 @@ public class CatalogTableUtilTest {
         Config source = config.getConfigList("source").get(0);
         ReadonlyConfig sourceReadonlyConfig = 
ReadonlyConfig.fromConfig(source);
         List<CatalogTable> catalogTables =
-                CatalogTableUtil.getCatalogTablesFromConfig(
+                CatalogTableUtil.getCatalogTables(
                         sourceReadonlyConfig, 
Thread.currentThread().getContextClassLoader());
         Assertions.assertEquals(2, catalogTables.size());
         Assertions.assertEquals(
@@ -121,7 +121,7 @@ public class CatalogTableUtilTest {
         Assertions.assertThrows(
                 SeaTunnelException.class,
                 () ->
-                        CatalogTableUtil.getCatalogTablesFromConfig(
+                        CatalogTableUtil.getCatalogTables(
                                 emptyReadonlyConfig,
                                 
Thread.currentThread().getContextClassLoader()));
         // test unknown catalog
@@ -132,7 +132,7 @@ public class CatalogTableUtilTest {
         Assertions.assertThrows(
                 SeaTunnelException.class,
                 () ->
-                        CatalogTableUtil.getCatalogTablesFromConfig(
+                        CatalogTableUtil.getCatalogTables(
                                 cannotFindCatalogReadonlyConfig,
                                 
Thread.currentThread().getContextClassLoader()));
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index 7b816ed3eb..761d36f8a2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -74,7 +74,7 @@ public class MongodbIncrementalSourceFactory implements 
TableSourceFactory {
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
             List<CatalogTable> catalogTables =
-                    CatalogTableUtil.getCatalogTablesFromConfig(
+                    CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
             SeaTunnelDataType<SeaTunnelRow> dataType =
                     CatalogTableUtil.convertToDataType(catalogTables);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 60e1105e30..90e76e835c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -94,7 +94,7 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory {
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
             List<CatalogTable> catalogTables =
-                    CatalogTableUtil.getCatalogTablesFromConfig(
+                    CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
             SeaTunnelDataType<SeaTunnelRow> dataType =
                     CatalogTableUtil.convertToDataType(catalogTables);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 7127209aef..41623937af 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -99,7 +99,7 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
             List<CatalogTable> catalogTables =
-                    CatalogTableUtil.getCatalogTablesFromConfig(
+                    CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
             SeaTunnelDataType<SeaTunnelRow> dataType =
                     CatalogTableUtil.convertToDataType(catalogTables);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
index 9ddd035b2a..43bb75dce5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogFactory.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.configuration.util.OptionValidationException;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -28,8 +27,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
 
 import com.google.auto.service.AutoService;
 
-import java.util.Optional;
-
 @AutoService(Factory.class)
 public class SqlServerCatalogFactory implements CatalogFactory {
     public static final String IDENTIFIER = "SqlServer";
@@ -43,10 +40,6 @@ public class SqlServerCatalogFactory implements 
CatalogFactory {
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
         String url = options.get(JdbcCatalogOptions.BASE_URL);
         JdbcUrlUtil.UrlInfo urlInfo = SqlServerURLParser.parse(url);
-        Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
-        if (!defaultDatabase.isPresent()) {
-            throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
-        }
         return new SqlServerCatalog(
                 catalogName,
                 options.get(JdbcCatalogOptions.USERNAME),
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index e2eb971105..8507b51330 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -31,10 +30,9 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -48,6 +46,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialecten
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -55,13 +54,10 @@ import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
-import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
 
 @AutoService(SeaTunnelSink.class)
 public class JdbcSink
@@ -199,44 +195,37 @@ public class JdbcSink
     @Override
     public void handleSaveMode(DataSaveMode saveMode) {
         if (catalogTable != null) {
-            Map<String, String> catalogOptions = 
config.get(CatalogOptions.CATALOG_OPTIONS);
-            if (catalogOptions != null) {
-                String factoryId = 
catalogOptions.get(CommonOptions.FACTORY_ID.key());
-                if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
-                    return;
-                }
-                CatalogFactory catalogFactory =
-                        discoverFactory(
-                                Thread.currentThread().getContextClassLoader(),
-                                CatalogFactory.class,
-                                factoryId);
-                if (catalogFactory != null) {
-                    try (Catalog catalog =
-                            catalogFactory.createCatalog(
-                                    catalogFactory.factoryIdentifier(),
-                                    ReadonlyConfig.fromMap(new 
HashMap<>(catalogOptions)))) {
-                        catalog.open();
-                        FieldIdeEnum fieldIdeEnumEnum = 
config.get(JdbcOptions.FIELD_IDE);
-                        String fieldIde =
-                                fieldIdeEnumEnum == null
-                                        ? FieldIdeEnum.ORIGINAL.getValue()
-                                        : fieldIdeEnumEnum.getValue();
-                        TablePath tablePath =
-                                TablePath.of(
-                                        jdbcSinkConfig.getDatabase()
-                                                + "."
-                                                + 
CatalogUtils.quoteTableIdentifier(
-                                                        
jdbcSinkConfig.getTable(), fieldIde));
-                        if 
(!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
-                            catalog.createDatabase(tablePath, true);
-                        }
-                        catalogTable.getOptions().put("fieldIde", fieldIde);
-                        if (!catalog.tableExists(tablePath)) {
-                            catalog.createTable(tablePath, catalogTable, true);
-                        }
-                    } catch (Exception e) {
-                        throw new 
JdbcConnectorException(HANDLE_SAVE_MODE_FAILED, e);
+            if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
+                return;
+            }
+            Optional<Catalog> catalogOptional =
+                    
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
+            if (catalogOptional.isPresent()) {
+                try (Catalog catalog = catalogOptional.get()) {
+                    catalog.open();
+                    FieldIdeEnum fieldIdeEnumEnum = 
config.get(JdbcOptions.FIELD_IDE);
+                    String fieldIde =
+                            fieldIdeEnumEnum == null
+                                    ? FieldIdeEnum.ORIGINAL.getValue()
+                                    : fieldIdeEnumEnum.getValue();
+                    TablePath tablePath =
+                            TablePath.of(
+                                    jdbcSinkConfig.getDatabase()
+                                            + "."
+                                            + 
CatalogUtils.quoteTableIdentifier(
+                                                    jdbcSinkConfig.getTable(), 
fieldIde));
+                    if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) 
{
+                        catalog.createDatabase(tablePath, true);
+                    }
+                    catalogTable.getOptions().put("fieldIde", fieldIde);
+                    if (!catalog.tableExists(tablePath)) {
+                        catalog.createTable(tablePath, catalogTable, true);
                     }
+                } catch (UnsupportedOperationException | CatalogException e) {
+                    // TODO Temporary fix, this feature has been changed in 
this pr
+                    // https://github.com/apache/seatunnel/pull/5645
+                } catch (Exception e) {
+                    throw new JdbcConnectorException(HANDLE_SAVE_MODE_FAILED, 
e);
                 }
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 6c63370592..540094571d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -73,16 +73,26 @@ public class JdbcSinkFactory implements TableSinkFactory {
         return "Jdbc";
     }
 
+    private ReadonlyConfig getCatalogOptions(TableSinkFactoryContext context) {
+        ReadonlyConfig config = context.getOptions();
+        // TODO Remove obsolete code
+        Optional<Map<String, String>> catalogOptions =
+                config.getOptional(CatalogOptions.CATALOG_OPTIONS);
+        if (catalogOptions.isPresent()) {
+            return ReadonlyConfig.fromMap(new HashMap<>(catalogOptions.get()));
+        }
+        return config;
+    }
+
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig config = context.getOptions();
         CatalogTable catalogTable = context.getCatalogTable();
-        Map<String, String> catalogOptions = 
config.get(CatalogOptions.CATALOG_OPTIONS);
+        ReadonlyConfig catalogOptions = getCatalogOptions(context);
         Optional<String> optionalTable = config.getOptional(TABLE);
         if (!optionalTable.isPresent()) {
-            catalogOptions = catalogOptions == null ? new HashMap<>() : 
catalogOptions;
-            String prefix = 
catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX.key());
-            String suffix = 
catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX.key());
+            String prefix = 
catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
+            String suffix = 
catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
             if (StringUtils.isNotEmpty(prefix) || 
StringUtils.isNotEmpty(suffix)) {
                 TableIdentifier tableId = catalogTable.getTableId();
                 String tableName =
@@ -106,12 +116,9 @@ public class JdbcSinkFactory implements TableSinkFactory {
                                 catalogTable.getCatalogName());
             }
             Map<String, String> map = config.toMap();
-            if 
(StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA.key()))) {
-                map.put(
-                        TABLE.key(),
-                        catalogOptions.get(JdbcCatalogOptions.SCHEMA.key())
-                                + "."
-                                + catalogTable.getTableId().getTableName());
+            String schema = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
+            if (StringUtils.isNotBlank(schema)) {
+                map.put(TABLE.key(), schema + "." + 
catalogTable.getTableId().getTableName());
             } else if 
(StringUtils.isNotBlank(catalogTable.getTableId().getSchemaName())) {
                 map.put(
                         TABLE.key(),
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 4fc5f6b189..a76cdffac9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -100,7 +100,7 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
         Optional<PartitionParameter> partitionParameter = Optional.empty();
         try {
             CatalogTable catalogTable =
-                    CatalogTableUtil.getCatalogTablesFromConfig(
+                    CatalogTableUtil.getCatalogTables(
                                     dialect.dialectName(),
                                     context.getOptions(),
                                     context.getClassLoader())
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
new file mode 100644
index 0000000000..3f19d76d01
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+public class JdbcCatalogUtils {
+    private static final String DEFAULT_CATALOG = "default";
+
+    private static ReadonlyConfig extractCatalogConfig(JdbcConnectionConfig 
config) {
+        Map<String, Object> catalogConfig = new HashMap<>();
+        catalogConfig.put(JdbcCatalogOptions.BASE_URL.key(), config.getUrl());
+        config.getUsername()
+                .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.USERNAME.key(), val));
+        config.getPassword()
+                .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
+        return ReadonlyConfig.fromMap(catalogConfig);
+    }
+
+    public static Optional<Catalog> findCatalog(JdbcConnectionConfig config, 
JdbcDialect dialect) {
+        ReadonlyConfig catalogConfig = extractCatalogConfig(config);
+        return FactoryUtil.createOptionalCatalog(
+                dialect.dialectName(),
+                catalogConfig,
+                JdbcCatalogUtils.class.getClassLoader(),
+                dialect.dialectName());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
index fe1185c960..1bb0b5c57f 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionValidationException;
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
-import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -38,7 +36,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.seatunnel.api.common.CommonOptions.FACTORY_ID;
@@ -61,22 +58,9 @@ public final class ConfigParserUtil {
                 FactoryUtil.getFactoryUrl(
                         FactoryUtil.discoverFactory(classLoader, factoryClass, 
factoryId));
         factoryUrls.add(factoryUrl);
-        getCatalogFactoryUrl(readonlyConfig, 
classLoader).ifPresent(factoryUrls::add);
         return factoryUrls;
     }
 
-    private static Optional<URL> getCatalogFactoryUrl(
-            ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
-        Map<String, String> catalogOptions =
-                
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new 
HashMap<>());
-        // TODO: fallback key
-        String factoryId =
-                catalogOptions.getOrDefault(FACTORY_ID.key(), 
readonlyConfig.get(PLUGIN_NAME));
-        Optional<CatalogFactory> optionalFactory =
-                FactoryUtil.discoverOptionalFactory(classLoader, 
CatalogFactory.class, factoryId);
-        return optionalFactory.map(FactoryUtil::getFactoryUrl);
-    }
-
     public static void checkGraph(
             List<? extends Config> sources,
             List<? extends Config> transforms,

Reply via email to