This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 652921fb75 Support config tableIdentifier for schema (#5628)
652921fb75 is described below

commit 652921fb7523c4deb86f972352daf86795c6b9a0
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Oct 17 14:23:56 2023 +0800

    Support config tableIdentifier for schema (#5628)
---
 docs/en/concept/schema-feature.md                  | 18 +++++++++++
 docs/en/connector-v2/source/FakeSource.md          | 16 +++++++++-
 .../seatunnel/api/table/catalog/CatalogTable.java  |  8 +++--
 .../api/table/catalog/CatalogTableUtil.java        | 37 +++++++++++++++++++---
 .../seatunnel/api/table/catalog/TablePath.java     | 16 +++++++++-
 .../table/catalog/schema/TableSchemaOptions.java   | 21 ++++++++++++
 .../assertion/rule/AssertCatalogTableRule.java     | 27 ++++++++++++++++
 .../rule/AssertCatalogTableRuleParser.java         | 19 +++++++++++
 .../seatunnel/assertion/sink/AssertConfig.java     |  7 ++++
 .../seatunnel/fake/config/FakeConfig.java          |  2 +-
 .../seatunnel/fake/source/FakeSource.java          | 32 +++++++++++--------
 .../fake_to_assert_with_catalogtable.conf          |  6 ++++
 12 files changed, 187 insertions(+), 22 deletions(-)

diff --git a/docs/en/concept/schema-feature.md 
b/docs/en/concept/schema-feature.md
index 01b5072800..4c95e79fef 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -11,6 +11,9 @@ We can use SchemaOptions to define schema, the SchemaOptions 
contains some confi
 
 ```
 schema = {
+    table = "database.schema.table"
+    schema_first = false
+    comment = "comment"
     columns = [
     ...
     ]
@@ -24,6 +27,20 @@ schema = {
 }
 ```
 
+### table
+
+The table full name of the table identifier which the schema belongs to, it 
contains database, schema, table name. e.g. `database.schema.table`, 
`database.table`, `table`.
+
+### schema_first
+
+Default is false.
+
+If the schema_first is true, the schema will be used first, this means if we 
set `table = "a.b"`, `a` will be parsed as schema rather than database, then we 
can support write `table = "schema.table"`.
+
+### comment
+
+The comment of the CatalogTable which the schema belongs to.
+
 ### Columns
 
 Columns is a list of config used to define the column in schema, each column 
can contains name, type, nullable, defaultValue, comment field.
@@ -131,6 +148,7 @@ source {
     result_table_name = "fake"
     row.num = 16
     schema {
+        table = "FakeDatabase.FakeTable"
         columns = [
            {
               name = id
diff --git a/docs/en/connector-v2/source/FakeSource.md 
b/docs/en/connector-v2/source/FakeSource.md
index b7d3d3d1f5..d23fc1a586 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -265,7 +265,21 @@ The template list of double type that connector generated, 
if user configured it
 
 ### table-names
 
-The table list that connector generated, used to simulate multi-table scenarios
+The table list that connector generated, used to simulate multi-table 
scenarios.
+
+This option will override the `table` option in the `schema` option.
+For example, if you configure the `table-names` option as follows, the 
connector will generate data for the `test.table1` and `test.table2` tables, 
the `database.schema.table` will be drop.
+
+```agsl
+FakeSource {
+    table-names = ["test.table1", "test.table2"]
+    schema = {
+        table = "database.schema.table"
+        ...
+    }
+    ...
+}
+```
 
 ### common options
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
index 5b1f124863..d4c8d1e332 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -48,7 +48,8 @@ public final class CatalogTable implements Serializable {
             Map<String, String> options,
             List<String> partitionKeys,
             String comment) {
-        return new CatalogTable(tableId, tableSchema, options, partitionKeys, 
comment);
+        return new CatalogTable(
+                tableId, tableSchema, options, partitionKeys, comment, 
tableId.getCatalogName());
     }
 
     public static CatalogTable of(
@@ -67,7 +68,7 @@ public final class CatalogTable implements Serializable {
             Map<String, String> options,
             List<String> partitionKeys,
             String comment) {
-        this(tableId, tableSchema, options, partitionKeys, comment, "");
+        this(tableId, tableSchema, options, partitionKeys, comment, 
tableId.getCatalogName());
     }
 
     private CatalogTable(
@@ -127,6 +128,9 @@ public final class CatalogTable implements Serializable {
                 + ", comment='"
                 + comment
                 + '\''
+                + ", catalogName='"
+                + catalogName
+                + '\''
                 + '}';
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 880d97ca46..b258faaf4d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -31,6 +31,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 
+import org.apache.commons.lang3.StringUtils;
+
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
@@ -135,7 +137,7 @@ public class CatalogTableUtil implements Serializable {
             if (schemaMap.isEmpty()) {
                 throw new SeaTunnelException("Schema config can not be empty");
             }
-            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
+            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(factoryId, readonlyConfig);
             return Collections.singletonList(catalogTable);
         }
 
@@ -190,6 +192,9 @@ public class CatalogTableUtil implements Serializable {
         }
     }
 
+    // We need to use buildWithConfig(String catalogName, ReadonlyConfig 
readonlyConfig);
+    // Since this method will not inject the correct catalogName into 
CatalogTable
+    @Deprecated
     public static List<CatalogTable> convertDataTypeToCatalogTables(
             SeaTunnelDataType<?> seaTunnelDataType, String tableId) {
         List<CatalogTable> catalogTables;
@@ -210,18 +215,42 @@ public class CatalogTableUtil implements Serializable {
     }
 
     public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
+        return buildWithConfig("", readonlyConfig);
+    }
+
+    public static CatalogTable buildWithConfig(String catalogName, 
ReadonlyConfig readonlyConfig) {
         if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
             throw new RuntimeException(
                     "Schema config need option [schema], please correct your 
config first");
         }
         TableSchema tableSchema = new 
ReadonlyConfigParser().parse(readonlyConfig);
+
+        ReadonlyConfig schemaConfig =
+                readonlyConfig
+                        .getOptional(TableSchemaOptions.SCHEMA)
+                        .map(ReadonlyConfig::fromMap)
+                        .orElseThrow(
+                                () -> new IllegalArgumentException("Schema 
config can't be null"));
+
+        TablePath tablePath;
+        if (StringUtils.isNotEmpty(
+                
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE))) {
+            tablePath =
+                    TablePath.of(
+                            
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE),
+                            schemaConfig.get(
+                                    
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
+        } else {
+            tablePath = TablePath.EMPTY;
+        }
+
         return CatalogTable.of(
-                // TODO: other table info
-                TableIdentifier.of("", "", ""),
+                TableIdentifier.of(catalogName, tablePath),
                 tableSchema,
                 new HashMap<>(),
+                // todo: add partitionKeys?
                 new ArrayList<>(),
-                "");
+                
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
     }
 
     public static SeaTunnelRowType buildSimpleTextSchema() {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
index 358e873b99..d123d3a330 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -34,12 +34,26 @@ public final class TablePath implements Serializable {
     private final String schemaName;
     private final String tableName;
 
+    public static final TablePath EMPTY = TablePath.of(null, null, null);
+
     public static TablePath of(String fullName) {
+        return of(fullName, false);
+    }
+
+    public static TablePath of(String fullName, boolean schemaFirst) {
         String[] paths = fullName.split("\\.");
 
+        if (paths.length == 1) {
+            return of(null, paths[0]);
+        }
+
         if (paths.length == 2) {
-            return of(paths[0], paths[1]);
+            if (schemaFirst) {
+                return of(null, paths[0], paths[1]);
+            }
+            return of(paths[0], null, paths[1]);
         }
+
         if (paths.length == 3) {
             return of(paths[0], paths[1], paths[2]);
         }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
index 19f858b98f..492fe1909c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
@@ -28,6 +28,27 @@ import java.util.Map;
 
 public class TableSchemaOptions {
 
+    public static class TableIdentifierOptions {
+
+        public static final Option<Boolean> SCHEMA_FIRST =
+                Options.key("schema_first")
+                        .booleanType()
+                        .defaultValue(false)
+                        .withDescription("Parse Schema First from table");
+
+        public static final Option<String> TABLE =
+                Options.key("table")
+                        .stringType()
+                        .noDefaultValue()
+                        .withDescription("SeaTunnel Schema Full Table Name");
+
+        public static final Option<String> COMMENT =
+                Options.key("comment")
+                        .stringType()
+                        .noDefaultValue()
+                        .withDescription("SeaTunnel Schema Table Comment");
+    }
+
     public static final Option<Map<String, Object>> SCHEMA =
             Options.key("schema")
                     .type(new TypeReference<Map<String, Object>>() {})
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
index 5245663b54..9d4e35493a 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
 
@@ -48,6 +49,9 @@ public class AssertCatalogTableRule implements Serializable {
     @OptionMark(description = "column rule")
     private AssertColumnRule columnRule;
 
+    @OptionMark(description = "tableIdentifier rule")
+    private AssertTableIdentifierRule tableIdentifierRule;
+
     public void checkRule(CatalogTable catalogTable) {
         TableSchema tableSchema = catalogTable.getTableSchema();
         if (tableSchema == null) {
@@ -62,6 +66,9 @@ public class AssertCatalogTableRule implements Serializable {
         if (columnRule != null) {
             columnRule.checkRule(tableSchema.getColumns());
         }
+        if (tableIdentifierRule != null) {
+            tableIdentifierRule.checkRule(catalogTable.getTableId());
+        }
     }
 
     @Data
@@ -138,4 +145,24 @@ public class AssertCatalogTableRule implements 
Serializable {
             }
         }
     }
+
+    @Data
+    @AllArgsConstructor
+    public static class AssertTableIdentifierRule implements Serializable {
+
+        private TableIdentifier tableIdentifier;
+
+        public void checkRule(TableIdentifier actiualTableIdentifier) {
+            if (actiualTableIdentifier == null) {
+                throw new AssertConnectorException(CATALOG_TABLE_FAILED, 
"tableIdentifier is null");
+            }
+            if (!actiualTableIdentifier.equals(tableIdentifier)) {
+                throw new AssertConnectorException(
+                        CATALOG_TABLE_FAILED,
+                        String.format(
+                                "tableIdentifier: %s is not equal to %s",
+                                actiualTableIdentifier, tableIdentifier));
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
index dc5063a2c8..fee135d4f1 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 
@@ -46,6 +48,9 @@ import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_RULE;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_CATALOG_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_RULE;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_TABLE_NAME;
 
 public class AssertCatalogTableRuleParser {
 
@@ -55,6 +60,7 @@ public class AssertCatalogTableRuleParser {
         
parsePrimaryKeyRule(catalogTableRule).ifPresent(tableRule::setPrimaryKeyRule);
         
parseConstraintKeyRule(catalogTableRule).ifPresent(tableRule::setConstraintKeyRule);
         parseColumnRule(catalogTableRule).ifPresent(tableRule::setColumnRule);
+        
parseTableIdentifierRule(catalogTableRule).ifPresent(tableRule::setTableIdentifierRule);
         return tableRule;
     }
 
@@ -156,4 +162,17 @@ public class AssertCatalogTableRuleParser {
                         .collect(Collectors.toList());
         return Optional.of(new 
AssertCatalogTableRule.AssertConstraintKeyRule(constraintKeys));
     }
+
+    private Optional<AssertCatalogTableRule.AssertTableIdentifierRule> 
parseTableIdentifierRule(
+            Config catalogTableRule) {
+        if (!catalogTableRule.hasPath(TABLE_IDENTIFIER_RULE)) {
+            return Optional.empty();
+        }
+        Config tableIdentifierRule = 
catalogTableRule.getConfig(TABLE_IDENTIFIER_RULE);
+        TableIdentifier tableIdentifier =
+                TableIdentifier.of(
+                        
tableIdentifierRule.getString(TABLE_IDENTIFIER_CATALOG_NAME),
+                        
TablePath.of(tableIdentifierRule.getString(TABLE_IDENTIFIER_TABLE_NAME)));
+        return Optional.of(new 
AssertCatalogTableRule.AssertTableIdentifierRule(tableIdentifier));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
index 5d9dddc58d..d86c034d68 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -64,6 +64,13 @@ public class AssertConfig {
     public static final String COLUMN_DEFAULT_VALUE = "default_value";
     public static final String COLUMN_COMMENT = "comment";
 
+    public static class TableIdentifierRule {
+        public static final String TABLE_IDENTIFIER_RULE = 
"table_identifier_rule";
+
+        public static final String TABLE_IDENTIFIER_CATALOG_NAME = 
"catalog_name";
+        public static final String TABLE_IDENTIFIER_TABLE_NAME = "table";
+    }
+
     public static final Option<String> COMMENT =
             Options.key("comment")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index f3aeb8d88e..e298fbea6d 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -416,7 +416,7 @@ public class FakeConfig implements Serializable {
             List<String> tableNames = 
config.getStringList(CatalogOptions.TABLE_NAMES.key());
             List<TableIdentifier> tableIdentifiers = new 
ArrayList<>(tableNames.size());
             for (String tableName : tableNames) {
-                tableIdentifiers.add(TableIdentifier.of("fake", 
TablePath.of(tableName)));
+                tableIdentifiers.add(TableIdentifier.of("FakeSource", 
TablePath.of(tableName)));
             }
             builder.tableIdentifiers(tableIdentifiers);
         }
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 15118d2bff..e634347fde 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,6 +42,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import com.google.auto.service.AutoService;
 import com.google.common.collect.Lists;
 
@@ -61,7 +64,7 @@ public class FakeSource
     public FakeSource() {}
 
     public FakeSource(ReadonlyConfig readonlyConfig) {
-        this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
+        this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(), 
readonlyConfig);
         this.fakeConfig = 
FakeConfig.buildWithConfig(readonlyConfig.toConfig());
     }
 
@@ -74,20 +77,23 @@ public class FakeSource
 
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
-        if (fakeConfig.getTableIdentifiers().isEmpty()) {
+        // If tableNames is empty, means this is only one catalogTable, return 
the original
+        // catalogTable
+        if (CollectionUtils.isEmpty(fakeConfig.getTableIdentifiers())) {
             return Lists.newArrayList(catalogTable);
-        } else {
-            return fakeConfig.getTableIdentifiers().stream()
-                    .map(
-                            tableIdentifier ->
-                                    CatalogTable.of(
-                                            tableIdentifier,
-                                            catalogTable.getTableSchema(),
-                                            catalogTable.getOptions(),
-                                            catalogTable.getPartitionKeys(),
-                                            catalogTable.getComment()))
-                    .collect(Collectors.toList());
         }
+        // Otherwise, return the catalogTables with the tableNames
+        return fakeConfig.getTableIdentifiers().stream()
+                .map(
+                        tableIdentifier ->
+                                CatalogTable.of(
+                                        TableIdentifier.of(
+                                                getPluginName(), 
tableIdentifier.toTablePath()),
+                                        catalogTable.getTableSchema(),
+                                        catalogTable.getOptions(),
+                                        catalogTable.getPartitionKeys(),
+                                        catalogTable.getComment()))
+                .collect(Collectors.toList());
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
index 4a98662b76..ee33778556 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf
@@ -24,6 +24,7 @@ source {
   FakeSource {
     row.num = 100
     schema = {
+      table = "test.fakeTable"
       columns = [
         {
             name = id
@@ -63,6 +64,11 @@ sink{
   Assert {
       rules {
         catalog_table_rule {
+            table_identifier_rule = {
+                catalog_name = "FakeSource"
+                table = "test.fakeTable"
+            }
+
             primary_key_rule = {
                 primary_key_name = "primary key"
                 primary_key_columns = ["id"]

Reply via email to