This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2c67cd8f3e [Feature] Add `table-names` from FakeSource/Assert to 
produce/assert multi-table (#5604)
2c67cd8f3e is described below

commit 2c67cd8f3e9950a5344512208cb05194ca297acc
Author: Jia Fan <[email protected]>
AuthorDate: Fri Oct 13 11:48:18 2023 +0800

    [Feature] Add `table-names` from FakeSource/Assert to produce/assert 
multi-table (#5604)
---
 .github/workflows/backend.yml                      |  2 +-
 docs/en/connector-v2/sink/Assert.md                |  5 ++
 docs/en/connector-v2/source/FakeSource.md          | 24 ++++++++
 .../seatunnel/assertion/rule/AssertTableRule.java  | 30 ++++++++++
 .../seatunnel/assertion/sink/AssertSink.java       | 29 ++++++++-
 .../seatunnel/assertion/sink/AssertSinkWriter.java | 20 ++++++-
 .../seatunnel/fake/config/FakeConfig.java          | 13 ++++
 .../seatunnel/fake/source/FakeDataGenerator.java   | 13 +++-
 .../seatunnel/fake/source/FakeSource.java          | 16 ++++-
 .../seatunnel/fake/source/FakeSourceFactory.java   |  4 +-
 .../e2e/connector/fake/FakeWithTableNamesTT.java   | 43 ++++++++++++++
 .../resources/fake_to_assert_with_tablenames.conf  | 69 ++++++++++++++++++++++
 12 files changed, 260 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 078a4d5e6e..0d59223364 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -412,7 +412,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 90
+    timeout-minutes: 120
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/sink/Assert.md 
b/docs/en/connector-v2/sink/Assert.md
index 175df92854..dff2657eaf 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -41,6 +41,7 @@ A flink sink plugin which can assert illegal data by user 
defined rules
 | rules.catalog_table_rule.column_rule.nullable                                
                  | boolean    | no       | -             |
 | rules.catalog_table_rule.column_rule.default_value                           
                  | string     | no       | -             |
 | rules.catalog_table_rule.column_rule.comment                                 
                  | comment    | no       | -             |
+| rules.table-names                                                            
                  | list       | no       | -             |
 | common-options                                                               
                  |            | no       | -             |
 
 ### rules [ConfigMap]
@@ -82,6 +83,10 @@ the value related to rule type
 
 Used to assert the catalog table is same with the user defined table.
 
+### table-names [ConfigList]
+
+Used to assert the table should be in the data.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/source/FakeSource.md 
b/docs/en/connector-v2/source/FakeSource.md
index d393d8f570..b7d3d3d1f5 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -54,6 +54,7 @@ just for some test cases such as type conversion or connector 
new feature testin
 | double.min          | double   | no       | 0                       |
 | double.max          | double   | no       | 0x1.fffffffffffffP+1023 |
 | double.template     | list     | no       | -                       |
+| table-names         | list     | no       | -                       |
 | common-options      |          | no       | -                       |
 
 ### schema [config]
@@ -262,6 +263,10 @@ The max value of double data that connector generated
 
 The template list of double type that connector generated, if user configured 
it, connector will randomly select an item from the template list
 
+### table-names
+
+The table list that connector generated, used to simulate multi-table scenarios
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
@@ -421,6 +426,25 @@ FakeSource {
 }
 ```
 
+Use table-names
+
+```hocon
+FakeSource {
+  table-names = ["test.table1", "test.table2"]
+  schema {
+    fields {
+      c_string = string
+      c_tinyint = tinyint
+      c_smallint = smallint
+      c_int = int
+      c_bigint = bigint
+      c_float = float
+      c_double = double
+    }
+  }
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertTableRule.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertTableRule.java
new file mode 100644
index 0000000000..02da760be7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertTableRule.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.rule;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class AssertTableRule implements Serializable {
+    private List<String> tableNames;
+}
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 5bdf46cebe..97106bea02 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -31,6 +32,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertCatalogTableRule;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
+import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
@@ -39,6 +41,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import com.google.auto.service.AutoService;
 import com.google.common.base.Throwables;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
@@ -52,6 +55,7 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     private CatalogTable catalogTable;
     private List<AssertFieldRule> assertFieldRules;
     private List<AssertFieldRule.AssertRule> assertRowRules;
+    private AssertTableRule assertTableRule;
 
     private AssertCatalogTableRule assertCatalogTableRule;
 
@@ -82,9 +86,17 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             assertCatalogTableRule.checkRule(catalogTable);
         }
 
+        if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
+            assertTableRule =
+                    new 
AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
+        } else {
+            assertTableRule = new AssertTableRule(new ArrayList<>());
+        }
+
         if (CollectionUtils.isEmpty(configList)
                 && CollectionUtils.isEmpty(rowConfigList)
-                && assertCatalogTableRule == null) {
+                && assertCatalogTableRule == null
+                && assertTableRule.getTableNames().isEmpty()) {
             Throwables.propagateIfPossible(
                     new ConfigException.BadValue(
                             RULES.key(), "Assert rule config is empty, please 
add rule config."));
@@ -103,7 +115,8 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
-        return new AssertSinkWriter(seaTunnelRowType, assertFieldRules, 
assertRowRules);
+        return new AssertSinkWriter(
+                seaTunnelRowType, assertFieldRules, assertRowRules, 
assertTableRule);
     }
 
     @Override
@@ -123,7 +136,17 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             assertFieldRules = new AssertRuleParser().parseRules(configList);
         }
 
-        if (CollectionUtils.isEmpty(configList) && 
CollectionUtils.isEmpty(rowConfigList)) {
+        if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
+            assertTableRule =
+                    new 
AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
+        } else {
+            assertTableRule = new AssertTableRule(new ArrayList<>());
+        }
+
+        if (CollectionUtils.isEmpty(configList)
+                && CollectionUtils.isEmpty(rowConfigList)
+                && assertCatalogTableRule == null
+                && assertTableRule.getTableNames().isEmpty()) {
             Throwables.propagateIfPossible(
                     new ConfigException.BadValue(
                             RULES.key(), "Assert rule config is empty, please 
add rule config."));
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
index ee865ad9da..c0bc0d9718 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
@@ -23,10 +23,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecu
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.LongAccumulator;
 
 public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -34,21 +38,26 @@ public class AssertSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     private final SeaTunnelRowType seaTunnelRowType;
     private final List<AssertFieldRule> assertFieldRules;
     private final List<AssertFieldRule.AssertRule> assertRowRules;
+    private final AssertTableRule assertTableRule;
     private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
     private static final LongAccumulator LONG_ACCUMULATOR = new 
LongAccumulator(Long::sum, 0);
+    private static final Set<String> TABLE_NAMES = new CopyOnWriteArraySet<>();
 
     public AssertSinkWriter(
             SeaTunnelRowType seaTunnelRowType,
             List<AssertFieldRule> assertFieldRules,
-            List<AssertFieldRule.AssertRule> assertRowRules) {
+            List<AssertFieldRule.AssertRule> assertRowRules,
+            AssertTableRule assertTableRule) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.assertFieldRules = assertFieldRules;
         this.assertRowRules = assertRowRules;
+        this.assertTableRule = assertTableRule;
     }
 
     @Override
     public void write(SeaTunnelRow element) {
         LONG_ACCUMULATOR.accumulate(1);
+        TABLE_NAMES.add(element.getTableId());
         if (Objects.nonNull(assertFieldRules)) {
             ASSERT_EXECUTOR
                     .fail(element, seaTunnelRowType, assertFieldRules)
@@ -89,5 +98,14 @@ public class AssertSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
                                                 + failRule);
                             });
         }
+        if (!assertTableRule.getTableNames().isEmpty()
+                && !new 
HashSet<>(assertTableRule.getTableNames()).equals(TABLE_NAMES)) {
+            throw new AssertConnectorException(
+                    AssertConnectorErrorCode.RULE_VALIDATION_FAILED,
+                    "table names: "
+                            + TABLE_NAMES
+                            + " is not equal to "
+                            + assertTableRule.getTableNames());
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index ef6ad79d37..f3aeb8d88e 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -20,6 +20,9 @@ package org.apache.seatunnel.connectors.seatunnel.fake.config;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
 
@@ -146,6 +149,8 @@ public class FakeConfig implements Serializable {
 
     private List<RowData> fakeRows;
 
+    @Builder.Default private List<TableIdentifier> tableIdentifiers = new 
ArrayList<>();
+
     // todo: use ReadonlyConfig
     public static FakeConfig buildWithConfig(Config config) {
         FakeConfigBuilder builder = FakeConfig.builder();
@@ -407,6 +412,14 @@ public class FakeConfig implements Serializable {
             builder.doubleFakeMode(
                     
FakeOption.FakeMode.parse(config.getString(DOUBLE_FAKE_MODE.key())));
         }
+        if (config.hasPath(CatalogOptions.TABLE_NAMES.key())) {
+            List<String> tableNames = 
config.getStringList(CatalogOptions.TABLE_NAMES.key());
+            List<TableIdentifier> tableIdentifiers = new 
ArrayList<>(tableNames.size());
+            for (String tableName : tableNames) {
+                tableIdentifiers.add(TableIdentifier.of("fake", 
TablePath.of(tableName)));
+            }
+            builder.tableIdentifiers(tableIdentifiers);
+        }
         return builder.build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index d643cf8d9d..90a33304e3 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -32,6 +32,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorExc
 import 
org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
+import org.apache.commons.lang3.RandomUtils;
+
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
@@ -74,7 +76,16 @@ public class FakeDataGenerator {
         for (SeaTunnelDataType<?> fieldType : fieldTypes) {
             randomRow.add(randomColumnValue(fieldType));
         }
-        return new SeaTunnelRow(randomRow.toArray());
+        SeaTunnelRow row = new SeaTunnelRow(randomRow.toArray());
+        if (!fakeConfig.getTableIdentifiers().isEmpty()) {
+            row.setTableId(
+                    fakeConfig
+                            .getTableIdentifiers()
+                            .get(RandomUtils.nextInt(0, 
fakeConfig.getTableIdentifiers().size()))
+                            .toTablePath()
+                            .toString());
+        }
+        return row;
     }
 
     /**
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index c03f9f9725..15118d2bff 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -46,6 +46,7 @@ import com.google.common.collect.Lists;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @AutoService(SeaTunnelSource.class)
 public class FakeSource
@@ -73,7 +74,20 @@ public class FakeSource
 
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
-        return Lists.newArrayList(catalogTable);
+        if (fakeConfig.getTableIdentifiers().isEmpty()) {
+            return Lists.newArrayList(catalogTable);
+        } else {
+            return fakeConfig.getTableIdentifiers().stream()
+                    .map(
+                            tableIdentifier ->
+                                    CatalogTable.of(
+                                            tableIdentifier,
+                                            catalogTable.getTableSchema(),
+                                            catalogTable.getOptions(),
+                                            catalogTable.getPartitionKeys(),
+                                            catalogTable.getComment()))
+                    .collect(Collectors.toList());
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
index ee668fa929..91ef55950c 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -97,7 +98,8 @@ public class FakeSourceFactory implements TableSourceFactory {
                         DATE_DAY_TEMPLATE,
                         TIME_HOUR_TEMPLATE,
                         TIME_MINUTE_TEMPLATE,
-                        TIME_SECOND_TEMPLATE)
+                        TIME_SECOND_TEMPLATE,
+                        CatalogOptions.TABLE_NAMES)
                 .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithTableNamesTT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithTableNamesTT.java
new file mode 100644
index 0000000000..50e58b1ae3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithTableNamesTT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.fake;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support 
CatalogTable")
+public class FakeWithTableNamesTT extends TestSuiteBase {
+    @TestTemplate
+    public void testFakeConnector(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult fakeWithTableNames =
+                container.executeJob("/fake_to_assert_with_tablenames.conf");
+        Assertions.assertEquals(0, fakeWithTableNames.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
new file mode 100644
index 0000000000..32fb751610
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    table-names = ["test.table1", "test.table2"]
+    schema = {
+      columns = [
+        {
+            name = id
+            type = bigint
+        }
+        {
+            name = name
+            type = string
+        }
+        {
+            name = age
+            type = int
+        }
+      ]
+      primaryKey = {
+        name = "primary key"
+        columnNames = ["id"]
+      }
+      constraintKeys = [
+          {
+              constraintName = "unique_name"
+              constraintType = UNIQUE_KEY
+              constraintColumns = [
+                  {
+                      columnName = "id"
+                      sortType = ASC
+                  }
+              ]
+          }
+      ]
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink{
+  Assert {
+      rules {
+        table-names = ["test.table1", "test.table2"]
+      }
+    }
+}
\ No newline at end of file

Reply via email to