This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cae66b6221 [Feature][Catalog] Add InMemoryCatalog for test and add new 
getCatalogTableFromConfig method (#5485)
cae66b6221 is described below

commit cae66b6221919c20f1f48f763eb21c4806f32551
Author: Jia Fan <[email protected]>
AuthorDate: Tue Sep 19 16:31:40 2023 +0800

    [Feature][Catalog] Add InMemoryCatalog for test and add new 
getCatalogTableFromConfig method (#5485)
---
 .../seatunnel/api/table/catalog/Catalog.java       |   2 +-
 .../api/table/catalog/CatalogOptions.java          |   9 -
 .../api/table/catalog/CatalogTableUtil.java        |  62 ++++++
 .../api/table/catalog/TableIdentifier.java         |   8 +
 .../api/table/catalog/CatalogTableUtilTest.java    |  57 +++--
 .../api/table/catalog/InMemoryCatalog.java         | 243 +++++++++++++++++++++
 .../api/table/catalog/InMemoryCatalogFactory.java  |  46 ++++
 .../table/catalog/InMemoryCatalogOptionRule.java   |  36 +++
 .../src/test/resources/conf/getCatalogTable.conf   |  44 ++++
 9 files changed, 484 insertions(+), 23 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index c84d6684a9..12e485b659 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -125,7 +125,7 @@ public interface Catalog extends AutoCloseable {
         // Get the list of specified tables
         List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
         List<CatalogTable> catalogTables = new ArrayList<>();
-        if (tableNames != null && tableNames.size() >= 1) {
+        if (tableNames != null && !tableNames.isEmpty()) {
             for (String tableName : tableNames) {
                 TablePath tablePath = TablePath.of(tableName);
                 if (this.tableExists(tablePath)) {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
index 4e8c6e68fb..326fd2fac0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
@@ -17,10 +17,8 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
-import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
 
 import java.util.List;
 import java.util.Map;
@@ -56,11 +54,4 @@ public interface CatalogOptions {
                     .withDescription(
                             "The table names RegEx of the database to capture."
                                     + "The table name needs to include the 
database name, for example: database_.*\\.table_.*");
-
-    OptionRule.Builder BASE_RULE =
-            OptionRule.builder()
-                    .optional(CommonOptions.FACTORY_ID)
-                    .optional(NAME)
-                    .optional(DATABASE_PATTERN)
-                    .exclusive(TABLE_PATTERN, TABLE_NAMES);
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 0ab5e8799b..07c3473bff 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -92,6 +93,8 @@ public class CatalogTableUtil implements Serializable {
                 "It is converted from RowType and only has column 
information.");
     }
 
+    // TODO remove this method after 
https://github.com/apache/seatunnel/issues/5483 done.
+    @Deprecated
     public static List<CatalogTable> getCatalogTables(Config config, 
ClassLoader classLoader) {
         ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
         Map<String, String> catalogOptions =
@@ -132,6 +135,65 @@ public class CatalogTableUtil implements Serializable {
                 .orElse(Collections.emptyList());
     }
 
+    /**
+     * Get catalog table from config, if schema is specified, return a catalog 
table with specified
+     * schema, otherwise, return a catalog table with schema from catalog.
+     *
+     * @deprecated DO NOT invoke it in any new 
TableSourceFactory/TableSinkFactory, please directly
+     *     use TableSourceFactory/TableSinkFactory instance to get 
CatalogTable. We just use it to
+     *     transition the old CatalogTable creation logic. Details please <a
+     *     
href="https://cwiki.apache.org/confluence/display/SEATUNNEL/STIP5-Refactor+Catalog+and+CatalogTable";>check
+     *     </a>
+     */
+    @Deprecated
+    public static List<CatalogTable> getCatalogTablesFromConfig(
+            Config config, ClassLoader classLoader) {
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+
+        // We use plugin_name as factoryId, so MySQL-CDC should be MySQL
+        String factoryId = 
readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
+        // Highest priority: specified schema
+        Map<String, String> schemaMap = 
readonlyConfig.get(CatalogTableUtil.SCHEMA);
+        if (schemaMap != null) {
+            if (schemaMap.isEmpty()) {
+                throw new SeaTunnelException("Schema config can not be empty");
+            }
+            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(config).getCatalogTable();
+            return Collections.singletonList(catalogTable);
+        }
+
+        Optional<Catalog> optionalCatalog =
+                FactoryUtil.createOptionalCatalog(
+                        factoryId, readonlyConfig, classLoader, factoryId);
+        return optionalCatalog
+                .map(
+                        c -> {
+                            long startTime = System.currentTimeMillis();
+                            try (Catalog catalog = c) {
+                                catalog.open();
+                                List<CatalogTable> catalogTables =
+                                        catalog.getTables(readonlyConfig);
+                                log.info(
+                                        String.format(
+                                                "Get catalog tables, cost 
time: %d",
+                                                System.currentTimeMillis() - 
startTime));
+                                if (catalogTables.isEmpty()) {
+                                    throw new SeaTunnelException(
+                                            String.format(
+                                                    "Can not find catalog 
table with factoryId [%s]",
+                                                    factoryId));
+                                }
+                                return catalogTables;
+                            }
+                        })
+                .orElseThrow(
+                        () ->
+                                new SeaTunnelException(
+                                        String.format(
+                                                "Can not find catalog with 
factoryId [%s]",
+                                                factoryId)));
+    }
+
     public static CatalogTableUtil buildWithConfig(Config config) {
         CheckResult checkResult = CheckConfigUtil.checkAllExists(config, 
"schema");
         if (!checkResult.isSuccess()) {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index 335ffebc4b..2d39f9b984 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -41,6 +41,14 @@ public final class TableIdentifier implements Serializable {
         return new TableIdentifier(catalogName, databaseName, null, tableName);
     }
 
+    public static TableIdentifier of(String catalogName, TablePath tablePath) {
+        return new TableIdentifier(
+                catalogName,
+                tablePath.getDatabaseName(),
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
     public static TableIdentifier of(
             String catalogName, String databaseName, String schemaName, String 
tableName) {
         return new TableIdentifier(catalogName, databaseName, schemaName, 
tableName);
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
index 3d8c8344cd..97df47f555 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.catalog;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -37,17 +38,17 @@ import java.io.FileNotFoundException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.seatunnel.api.table.catalog.CatalogOptions.TABLE_NAMES;
+import static 
org.apache.seatunnel.common.constants.CollectionConstants.PLUGIN_NAME;
 
 public class CatalogTableUtilTest {
     @Test
     public void testSimpleSchemaParse() throws FileNotFoundException, 
URISyntaxException {
         String path = getTestConfigFile("/conf/simple.schema.conf");
-        Config config =
-                ConfigFactory.parseFile(new File(path))
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config config = ConfigFactory.parseFile(new File(path));
         SeaTunnelRowType seaTunnelRowType =
                 CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
         Assertions.assertNotNull(seaTunnelRowType);
@@ -61,12 +62,7 @@ public class CatalogTableUtilTest {
     @Test
     public void testComplexSchemaParse() throws FileNotFoundException, 
URISyntaxException {
         String path = getTestConfigFile("/conf/complex.schema.conf");
-        Config config =
-                ConfigFactory.parseFile(new File(path))
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config config = ConfigFactory.parseFile(new File(path));
         SeaTunnelRowType seaTunnelRowType =
                 CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
         Assertions.assertNotNull(seaTunnelRowType);
@@ -88,6 +84,41 @@ public class CatalogTableUtilTest {
                 "row", 
nestedRowFieldType.getFieldName(nestedRowFieldType.indexOf("row")));
     }
 
+    @Test
+    public void testCatalogUtilGetCatalogTable() throws FileNotFoundException, 
URISyntaxException {
+        String path = getTestConfigFile("/conf/getCatalogTable.conf");
+        Config config = ConfigFactory.parseFile(new File(path));
+        Config source = config.getConfigList("source").get(0);
+        List<CatalogTable> catalogTables =
+                CatalogTableUtil.getCatalogTablesFromConfig(
+                        source, 
Thread.currentThread().getContextClassLoader());
+        Assertions.assertEquals(2, catalogTables.size());
+        Assertions.assertEquals(
+                TableIdentifier.of("InMemory", 
TablePath.of("st.public.table1")),
+                catalogTables.get(0).getTableId());
+        Assertions.assertEquals(
+                TableIdentifier.of("InMemory", 
TablePath.of("st.public.table2")),
+                catalogTables.get(1).getTableId());
+        // test empty tables
+        Config emptyTableSource =
+                source.withValue(
+                        TABLE_NAMES.key(), ConfigValueFactory.fromIterable(new 
ArrayList<>()));
+        Assertions.assertThrows(
+                SeaTunnelException.class,
+                () ->
+                        CatalogTableUtil.getCatalogTablesFromConfig(
+                                emptyTableSource, 
Thread.currentThread().getContextClassLoader()));
+        // test unknown catalog
+        Config cannotFindCatalogSource =
+                source.withValue(PLUGIN_NAME, 
ConfigValueFactory.fromAnyRef("unknownCatalog"));
+        Assertions.assertThrows(
+                SeaTunnelException.class,
+                () ->
+                        CatalogTableUtil.getCatalogTablesFromConfig(
+                                cannotFindCatalogSource,
+                                
Thread.currentThread().getContextClassLoader()));
+    }
+
     public static String getTestConfigFile(String configFile)
             throws FileNotFoundException, URISyntaxException {
         URL resource = CatalogTableUtilTest.class.getResource(configFile);
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
new file mode 100644
index 0000000000..8a9a9ddef4
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class InMemoryCatalog implements Catalog {
+    private final ReadonlyConfig options;
+    private final String name;
+    // database -> tables
+    private final Map<String, List<CatalogTable>> catalogTables;
+    private static final String DEFAULT_DATABASE = "default";
+
+    InMemoryCatalog(String catalogName, ReadonlyConfig options) {
+        this.name = catalogName;
+        this.options = options;
+        this.catalogTables = new HashMap<>();
+        addDefaultTable();
+    }
+
+    // Add some default table for testing
+    private void addDefaultTable() {
+        this.catalogTables.put(DEFAULT_DATABASE, new ArrayList<>());
+        List<CatalogTable> tables = new ArrayList<>();
+        this.catalogTables.put("st", tables);
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .column(PhysicalColumn.of("id", BasicType.LONG_TYPE, 
22, false, null, "id"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "name", BasicType.STRING_TYPE, 128, 
false, null, "name"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "age", BasicType.INT_TYPE, null, true, 
null, "age"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "createTime",
+                                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                        3,
+                                        true,
+                                        null,
+                                        "createTime"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "lastUpdateTime",
+                                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                        3,
+                                        true,
+                                        null,
+                                        "lastUpdateTime"))
+                        .primaryKey(PrimaryKey.of("id", 
Lists.newArrayList("id")))
+                        .constraintKey(
+                                ConstraintKey.of(
+                                        ConstraintKey.ConstraintType.KEY,
+                                        "name",
+                                        Lists.newArrayList(
+                                                
ConstraintKey.ConstraintKeyColumn.of(
+                                                        "name", null))))
+                        .build();
+        CatalogTable catalogTable1 =
+                CatalogTable.of(
+                        TableIdentifier.of(name, TablePath.of("st", "public", 
"table1")),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "In Memory Table");
+        CatalogTable catalogTable2 =
+                CatalogTable.of(
+                        TableIdentifier.of(name, TablePath.of("st", "public", 
"table2")),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "In Memory Table",
+                        name);
+        tables.add(catalogTable1);
+        tables.add(catalogTable2);
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        String username = options.get(InMemoryCatalogOptionRule.username);
+        String password = options.get(InMemoryCatalogOptionRule.password);
+        String host = options.get(InMemoryCatalogOptionRule.host);
+        int port = options.get(InMemoryCatalogOptionRule.port);
+        log.trace(
+                String.format(
+                        "InMemoryCatalog %s opening with %s/%s in %s:%s",
+                        name, username, password, host, port));
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        log.trace(String.format("InMemoryCatalog %s closing", name));
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return DEFAULT_DATABASE;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return catalogTables.containsKey(databaseName);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return new ArrayList<>(catalogTables.keySet());
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        return catalogTables.get(databaseName).stream()
+                .map(
+                        table ->
+                                table.getTableId().getSchemaName()
+                                        + "."
+                                        + table.getTableId().getTableName())
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+            List<CatalogTable> tables = 
catalogTables.get(tablePath.getDatabaseName());
+            return tables.stream().anyMatch(t -> 
t.getTableId().toTablePath().equals(tablePath));
+        }
+        return false;
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+            List<CatalogTable> tables = 
catalogTables.get(tablePath.getDatabaseName());
+            return tables.stream()
+                    .filter(t -> 
t.getTableId().toTablePath().equals(tablePath))
+                    .findFirst()
+                    .orElseThrow(() -> new TableNotExistException(name, 
tablePath));
+        } else {
+            throw new TableNotExistException(name, tablePath);
+        }
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+            List<CatalogTable> tables = 
catalogTables.get(tablePath.getDatabaseName());
+            if (tables.stream().anyMatch(t -> 
t.getTableId().toTablePath().equals(tablePath))) {
+                if (ignoreIfExists) {
+                    log.debug("Table {} already exists, ignore", 
tablePath.getFullName());
+                } else {
+                    throw new TableAlreadyExistException(name, tablePath);
+                }
+            } else {
+                tables.add(table);
+            }
+        } else {
+            throw new DatabaseNotExistException(name, 
tablePath.getDatabaseName());
+        }
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+            List<CatalogTable> tables = 
catalogTables.get(tablePath.getDatabaseName());
+            if (tables.stream().anyMatch(t -> 
t.getTableId().toTablePath().equals(tablePath))) {
+                tables.removeIf(t -> 
t.getTableId().toTablePath().equals(tablePath));
+            } else {
+                if (ignoreIfNotExists) {
+                    log.debug("Table {} not exists, ignore", 
tablePath.getFullName());
+                } else {
+                    throw new TableNotExistException(name, tablePath);
+                }
+            }
+        } else {
+            throw new DatabaseNotExistException(name, 
tablePath.getDatabaseName());
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+            if (ignoreIfExists) {
+                log.debug("Database {} already exists, ignore", 
tablePath.getDatabaseName());
+            } else {
+                throw new DatabaseAlreadyExistException(name, 
tablePath.getDatabaseName());
+            }
+        } else {
+            catalogTables.put(tablePath.getDatabaseName(), new ArrayList<>());
+        }
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+            catalogTables.remove(tablePath.getDatabaseName());
+        } else {
+            if (ignoreIfNotExists) {
+                log.debug("Database {} not exists, ignore", 
tablePath.getDatabaseName());
+            } else {
+                throw new DatabaseNotExistException(name, 
tablePath.getDatabaseName());
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogFactory.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogFactory.java
new file mode 100644
index 0000000000..f6876640d3
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class InMemoryCatalogFactory implements CatalogFactory {
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        return new InMemoryCatalog(catalogName, options);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "InMemory";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(InMemoryCatalogOptionRule.username, 
InMemoryCatalogOptionRule.password)
+                .optional(InMemoryCatalogOptionRule.host, 
InMemoryCatalogOptionRule.port)
+                .build();
+    }
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogOptionRule.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogOptionRule.java
new file mode 100644
index 0000000000..d835983ee4
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogOptionRule.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class InMemoryCatalogOptionRule {
+
+    public static final Option<String> username =
+            
Options.key("username").stringType().noDefaultValue().withDescription("username");
+
+    public static final Option<String> password =
+            
Options.key("password").stringType().noDefaultValue().withDescription("password");
+
+    public static final Option<String> host =
+            
Options.key("host").stringType().defaultValue("localhost").withDescription("host");
+
+    public static final Option<Integer> port =
+            
Options.key("port").intType().defaultValue(5081).withDescription("port");
+}
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf 
b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
new file mode 100644
index 0000000000..485f026a0d
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  job.mode = "BATCH"
+  checkpoint.interval = 5000
+}
+
+source {
+  InMemory {
+    result_table_name = "fake"
+    username = "st"
+    password = "stpassword"
+    table-names = ["st.public.table1", "st.public.table2"]
+    parallelism = 3
+  }
+}
+
+transform {
+}
+
+sink {
+  InMemory {
+    source_table_name = "fake"
+    username = "st"
+    password = "stpassword"
+    address = "localhost"
+    port = 1234
+  }
+}
\ No newline at end of file

Reply via email to