This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cae66b6221 [Feature][Catalog] Add InMemoryCatalog for test and add new
getCatalogTableFromConfig method (#5485)
cae66b6221 is described below
commit cae66b6221919c20f1f48f763eb21c4806f32551
Author: Jia Fan <[email protected]>
AuthorDate: Tue Sep 19 16:31:40 2023 +0800
[Feature][Catalog] Add InMemoryCatalog for test and add new
getCatalogTableFromConfig method (#5485)
---
.../seatunnel/api/table/catalog/Catalog.java | 2 +-
.../api/table/catalog/CatalogOptions.java | 9 -
.../api/table/catalog/CatalogTableUtil.java | 62 ++++++
.../api/table/catalog/TableIdentifier.java | 8 +
.../api/table/catalog/CatalogTableUtilTest.java | 57 +++--
.../api/table/catalog/InMemoryCatalog.java | 243 +++++++++++++++++++++
.../api/table/catalog/InMemoryCatalogFactory.java | 46 ++++
.../table/catalog/InMemoryCatalogOptionRule.java | 36 +++
.../src/test/resources/conf/getCatalogTable.conf | 44 ++++
9 files changed, 484 insertions(+), 23 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index c84d6684a9..12e485b659 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -125,7 +125,7 @@ public interface Catalog extends AutoCloseable {
// Get the list of specified tables
List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
List<CatalogTable> catalogTables = new ArrayList<>();
- if (tableNames != null && tableNames.size() >= 1) {
+ if (tableNames != null && !tableNames.isEmpty()) {
for (String tableName : tableNames) {
TablePath tablePath = TablePath.of(tableName);
if (this.tableExists(tablePath)) {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
index 4e8c6e68fb..326fd2fac0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
@@ -17,10 +17,8 @@
package org.apache.seatunnel.api.table.catalog;
-import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
import java.util.List;
import java.util.Map;
@@ -56,11 +54,4 @@ public interface CatalogOptions {
.withDescription(
"The table names RegEx of the database to capture."
+ "The table name needs to include the
database name, for example: database_.*\\.table_.*");
-
- OptionRule.Builder BASE_RULE =
- OptionRule.builder()
- .optional(CommonOptions.FACTORY_ID)
- .optional(NAME)
- .optional(DATABASE_PATTERN)
- .exclusive(TABLE_PATTERN, TABLE_NAMES);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 0ab5e8799b..07c3473bff 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -92,6 +93,8 @@ public class CatalogTableUtil implements Serializable {
"It is converted from RowType and only has column
information.");
}
+ // TODO remove this method after
https://github.com/apache/seatunnel/issues/5483 done.
+ @Deprecated
public static List<CatalogTable> getCatalogTables(Config config,
ClassLoader classLoader) {
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
Map<String, String> catalogOptions =
@@ -132,6 +135,65 @@ public class CatalogTableUtil implements Serializable {
.orElse(Collections.emptyList());
}
+ /**
+ * Get catalog table from config, if schema is specified, return a catalog
table with specified
+ * schema, otherwise, return a catalog table with schema from catalog.
+ *
+ * @deprecated DO NOT invoke it in any new
TableSourceFactory/TableSinkFactory, please directly
+ * use TableSourceFactory/TableSinkFactory instance to get
CatalogTable. We just use it to
+ * transition the old CatalogTable creation logic. Details please <a
+ *
href="https://cwiki.apache.org/confluence/display/SEATUNNEL/STIP5-Refactor+Catalog+and+CatalogTable">check
+ * </a>
+ */
+ @Deprecated
+ public static List<CatalogTable> getCatalogTablesFromConfig(
+ Config config, ClassLoader classLoader) {
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+
+ // We use plugin_name as factoryId, so MySQL-CDC should be MySQL
+ String factoryId =
readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
+ // Highest priority: specified schema
+ Map<String, String> schemaMap =
readonlyConfig.get(CatalogTableUtil.SCHEMA);
+ if (schemaMap != null) {
+ if (schemaMap.isEmpty()) {
+ throw new SeaTunnelException("Schema config can not be empty");
+ }
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(config).getCatalogTable();
+ return Collections.singletonList(catalogTable);
+ }
+
+ Optional<Catalog> optionalCatalog =
+ FactoryUtil.createOptionalCatalog(
+ factoryId, readonlyConfig, classLoader, factoryId);
+ return optionalCatalog
+ .map(
+ c -> {
+ long startTime = System.currentTimeMillis();
+ try (Catalog catalog = c) {
+ catalog.open();
+ List<CatalogTable> catalogTables =
+ catalog.getTables(readonlyConfig);
+ log.info(
+ String.format(
+ "Get catalog tables, cost
time: %d",
+ System.currentTimeMillis() -
startTime));
+ if (catalogTables.isEmpty()) {
+ throw new SeaTunnelException(
+ String.format(
+ "Can not find catalog
table with factoryId [%s]",
+ factoryId));
+ }
+ return catalogTables;
+ }
+ })
+ .orElseThrow(
+ () ->
+ new SeaTunnelException(
+ String.format(
+ "Can not find catalog with
factoryId [%s]",
+ factoryId)));
+ }
+
public static CatalogTableUtil buildWithConfig(Config config) {
CheckResult checkResult = CheckConfigUtil.checkAllExists(config,
"schema");
if (!checkResult.isSuccess()) {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index 335ffebc4b..2d39f9b984 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -41,6 +41,14 @@ public final class TableIdentifier implements Serializable {
return new TableIdentifier(catalogName, databaseName, null, tableName);
}
+ public static TableIdentifier of(String catalogName, TablePath tablePath) {
+ return new TableIdentifier(
+ catalogName,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
+
public static TableIdentifier of(
String catalogName, String databaseName, String schemaName, String
tableName) {
return new TableIdentifier(catalogName, databaseName, schemaName,
tableName);
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
index 3d8c8344cd..97df47f555 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.catalog;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -37,17 +38,17 @@ import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.seatunnel.api.table.catalog.CatalogOptions.TABLE_NAMES;
+import static
org.apache.seatunnel.common.constants.CollectionConstants.PLUGIN_NAME;
public class CatalogTableUtilTest {
@Test
public void testSimpleSchemaParse() throws FileNotFoundException,
URISyntaxException {
String path = getTestConfigFile("/conf/simple.schema.conf");
- Config config =
- ConfigFactory.parseFile(new File(path))
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ Config config = ConfigFactory.parseFile(new File(path));
SeaTunnelRowType seaTunnelRowType =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
Assertions.assertNotNull(seaTunnelRowType);
@@ -61,12 +62,7 @@ public class CatalogTableUtilTest {
@Test
public void testComplexSchemaParse() throws FileNotFoundException,
URISyntaxException {
String path = getTestConfigFile("/conf/complex.schema.conf");
- Config config =
- ConfigFactory.parseFile(new File(path))
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
-
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ Config config = ConfigFactory.parseFile(new File(path));
SeaTunnelRowType seaTunnelRowType =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
Assertions.assertNotNull(seaTunnelRowType);
@@ -88,6 +84,41 @@ public class CatalogTableUtilTest {
"row",
nestedRowFieldType.getFieldName(nestedRowFieldType.indexOf("row")));
}
+ @Test
+ public void testCatalogUtilGetCatalogTable() throws FileNotFoundException,
URISyntaxException {
+ String path = getTestConfigFile("/conf/getCatalogTable.conf");
+ Config config = ConfigFactory.parseFile(new File(path));
+ Config source = config.getConfigList("source").get(0);
+ List<CatalogTable> catalogTables =
+ CatalogTableUtil.getCatalogTablesFromConfig(
+ source,
Thread.currentThread().getContextClassLoader());
+ Assertions.assertEquals(2, catalogTables.size());
+ Assertions.assertEquals(
+ TableIdentifier.of("InMemory",
TablePath.of("st.public.table1")),
+ catalogTables.get(0).getTableId());
+ Assertions.assertEquals(
+ TableIdentifier.of("InMemory",
TablePath.of("st.public.table2")),
+ catalogTables.get(1).getTableId());
+ // test empty tables
+ Config emptyTableSource =
+ source.withValue(
+ TABLE_NAMES.key(), ConfigValueFactory.fromIterable(new
ArrayList<>()));
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () ->
+ CatalogTableUtil.getCatalogTablesFromConfig(
+ emptyTableSource,
Thread.currentThread().getContextClassLoader()));
+ // test unknown catalog
+ Config cannotFindCatalogSource =
+ source.withValue(PLUGIN_NAME,
ConfigValueFactory.fromAnyRef("unknownCatalog"));
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () ->
+ CatalogTableUtil.getCatalogTablesFromConfig(
+ cannotFindCatalogSource,
+
Thread.currentThread().getContextClassLoader()));
+ }
+
public static String getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
URL resource = CatalogTableUtilTest.class.getResource(configFile);
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
new file mode 100644
index 0000000000..8a9a9ddef4
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class InMemoryCatalog implements Catalog {
+ private final ReadonlyConfig options;
+ private final String name;
+ // database -> tables
+ private final Map<String, List<CatalogTable>> catalogTables;
+ private static final String DEFAULT_DATABASE = "default";
+
+ InMemoryCatalog(String catalogName, ReadonlyConfig options) {
+ this.name = catalogName;
+ this.options = options;
+ this.catalogTables = new HashMap<>();
+ addDefaultTable();
+ }
+
+ // Add some default table for testing
+ private void addDefaultTable() {
+ this.catalogTables.put(DEFAULT_DATABASE, new ArrayList<>());
+ List<CatalogTable> tables = new ArrayList<>();
+ this.catalogTables.put("st", tables);
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .column(PhysicalColumn.of("id", BasicType.LONG_TYPE,
22, false, null, "id"))
+ .column(
+ PhysicalColumn.of(
+ "name", BasicType.STRING_TYPE, 128,
false, null, "name"))
+ .column(
+ PhysicalColumn.of(
+ "age", BasicType.INT_TYPE, null, true,
null, "age"))
+ .column(
+ PhysicalColumn.of(
+ "createTime",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ 3,
+ true,
+ null,
+ "createTime"))
+ .column(
+ PhysicalColumn.of(
+ "lastUpdateTime",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ 3,
+ true,
+ null,
+ "lastUpdateTime"))
+ .primaryKey(PrimaryKey.of("id",
Lists.newArrayList("id")))
+ .constraintKey(
+ ConstraintKey.of(
+ ConstraintKey.ConstraintType.KEY,
+ "name",
+ Lists.newArrayList(
+
ConstraintKey.ConstraintKeyColumn.of(
+ "name", null))))
+ .build();
+ CatalogTable catalogTable1 =
+ CatalogTable.of(
+ TableIdentifier.of(name, TablePath.of("st", "public",
"table1")),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "In Memory Table");
+ CatalogTable catalogTable2 =
+ CatalogTable.of(
+ TableIdentifier.of(name, TablePath.of("st", "public",
"table2")),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "In Memory Table",
+ name);
+ tables.add(catalogTable1);
+ tables.add(catalogTable2);
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ String username = options.get(InMemoryCatalogOptionRule.username);
+ String password = options.get(InMemoryCatalogOptionRule.password);
+ String host = options.get(InMemoryCatalogOptionRule.host);
+ int port = options.get(InMemoryCatalogOptionRule.port);
+ log.trace(
+ String.format(
+ "InMemoryCatalog %s opening with %s/%s in %s:%s",
+ name, username, password, host, port));
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ log.trace(String.format("InMemoryCatalog %s closing", name));
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return DEFAULT_DATABASE;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return catalogTables.containsKey(databaseName);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return new ArrayList<>(catalogTables.keySet());
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ return catalogTables.get(databaseName).stream()
+ .map(
+ table ->
+ table.getTableId().getSchemaName()
+ + "."
+ + table.getTableId().getTableName())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+ List<CatalogTable> tables =
catalogTables.get(tablePath.getDatabaseName());
+ return tables.stream().anyMatch(t ->
t.getTableId().toTablePath().equals(tablePath));
+ }
+ return false;
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+ List<CatalogTable> tables =
catalogTables.get(tablePath.getDatabaseName());
+ return tables.stream()
+ .filter(t ->
t.getTableId().toTablePath().equals(tablePath))
+ .findFirst()
+ .orElseThrow(() -> new TableNotExistException(name,
tablePath));
+ } else {
+ throw new TableNotExistException(name, tablePath);
+ }
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+ List<CatalogTable> tables =
catalogTables.get(tablePath.getDatabaseName());
+ if (tables.stream().anyMatch(t ->
t.getTableId().toTablePath().equals(tablePath))) {
+ if (ignoreIfExists) {
+ log.debug("Table {} already exists, ignore",
tablePath.getFullName());
+ } else {
+ throw new TableAlreadyExistException(name, tablePath);
+ }
+ } else {
+ tables.add(table);
+ }
+ } else {
+ throw new DatabaseNotExistException(name,
tablePath.getDatabaseName());
+ }
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+ List<CatalogTable> tables =
catalogTables.get(tablePath.getDatabaseName());
+ if (tables.stream().anyMatch(t ->
t.getTableId().toTablePath().equals(tablePath))) {
+ tables.removeIf(t ->
t.getTableId().toTablePath().equals(tablePath));
+ } else {
+ if (ignoreIfNotExists) {
+ log.debug("Table {} not exists, ignore",
tablePath.getFullName());
+ } else {
+ throw new TableNotExistException(name, tablePath);
+ }
+ }
+ } else {
+ throw new DatabaseNotExistException(name,
tablePath.getDatabaseName());
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+ if (ignoreIfExists) {
+ log.debug("Database {} already exists, ignore",
tablePath.getDatabaseName());
+ } else {
+ throw new DatabaseAlreadyExistException(name,
tablePath.getDatabaseName());
+ }
+ } else {
+ catalogTables.put(tablePath.getDatabaseName(), new ArrayList<>());
+ }
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+ catalogTables.remove(tablePath.getDatabaseName());
+ } else {
+ if (ignoreIfNotExists) {
+ log.debug("Database {} not exists, ignore",
tablePath.getDatabaseName());
+ } else {
+ throw new DatabaseNotExistException(name,
tablePath.getDatabaseName());
+ }
+ }
+ }
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogFactory.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogFactory.java
new file mode 100644
index 0000000000..f6876640d3
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class InMemoryCatalogFactory implements CatalogFactory {
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ return new InMemoryCatalog(catalogName, options);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "InMemory";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(InMemoryCatalogOptionRule.username,
InMemoryCatalogOptionRule.password)
+ .optional(InMemoryCatalogOptionRule.host,
InMemoryCatalogOptionRule.port)
+ .build();
+ }
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogOptionRule.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogOptionRule.java
new file mode 100644
index 0000000000..d835983ee4
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalogOptionRule.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class InMemoryCatalogOptionRule {
+
+ public static final Option<String> username =
+
Options.key("username").stringType().noDefaultValue().withDescription("username");
+
+ public static final Option<String> password =
+
Options.key("password").stringType().noDefaultValue().withDescription("password");
+
+ public static final Option<String> host =
+
Options.key("host").stringType().defaultValue("localhost").withDescription("host");
+
+ public static final Option<Integer> port =
+
Options.key("port").intType().defaultValue(5081).withDescription("port");
+}
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
new file mode 100644
index 0000000000..485f026a0d
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+}
+
+source {
+ InMemory {
+ result_table_name = "fake"
+ username = "st"
+ password = "stpassword"
+ table-names = ["st.public.table1", "st.public.table2"]
+ parallelism = 3
+ }
+}
+
+transform {
+}
+
+sink {
+ InMemory {
+ source_table_name = "fake"
+ username = "st"
+ password = "stpassword"
+ address = "localhost"
+ port = 1234
+ }
+}
\ No newline at end of file