This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2c67cd8f3e [Feature] Add `table-names` from FakeSource/Assert to
produce/assert multi-table (#5604)
2c67cd8f3e is described below
commit 2c67cd8f3e9950a5344512208cb05194ca297acc
Author: Jia Fan <[email protected]>
AuthorDate: Fri Oct 13 11:48:18 2023 +0800
[Feature] Add `table-names` from FakeSource/Assert to produce/assert
multi-table (#5604)
---
.github/workflows/backend.yml | 2 +-
docs/en/connector-v2/sink/Assert.md | 5 ++
docs/en/connector-v2/source/FakeSource.md | 24 ++++++++
.../seatunnel/assertion/rule/AssertTableRule.java | 30 ++++++++++
.../seatunnel/assertion/sink/AssertSink.java | 29 ++++++++-
.../seatunnel/assertion/sink/AssertSinkWriter.java | 20 ++++++-
.../seatunnel/fake/config/FakeConfig.java | 13 ++++
.../seatunnel/fake/source/FakeDataGenerator.java | 13 +++-
.../seatunnel/fake/source/FakeSource.java | 16 ++++-
.../seatunnel/fake/source/FakeSourceFactory.java | 4 +-
.../e2e/connector/fake/FakeWithTableNamesTT.java | 43 ++++++++++++++
.../resources/fake_to_assert_with_tablenames.conf | 69 ++++++++++++++++++++++
12 files changed, 260 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 078a4d5e6e..0d59223364 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -412,7 +412,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 90
+ timeout-minutes: 120
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/sink/Assert.md
b/docs/en/connector-v2/sink/Assert.md
index 175df92854..dff2657eaf 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -41,6 +41,7 @@ A flink sink plugin which can assert illegal data by user
defined rules
| rules.catalog_table_rule.column_rule.nullable
| boolean | no | - |
| rules.catalog_table_rule.column_rule.default_value
| string | no | - |
| rules.catalog_table_rule.column_rule.comment
| comment | no | - |
+| rules.table-names
| list | no | - |
| common-options
| | no | - |
### rules [ConfigMap]
@@ -82,6 +83,10 @@ the value related to rule type
Used to assert the catalog table is same with the user defined table.
+### table-names [ConfigList]
+
+Used to assert the table should be in the data.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index d393d8f570..b7d3d3d1f5 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -54,6 +54,7 @@ just for some test cases such as type conversion or connector
new feature testin
| double.min | double | no | 0 |
| double.max | double | no | 0x1.fffffffffffffP+1023 |
| double.template | list | no | - |
+| table-names | list | no | - |
| common-options | | no | - |
### schema [config]
@@ -262,6 +263,10 @@ The max value of double data that connector generated
The template list of double type that connector generated, if user configured
it, connector will randomly select an item from the template list
+### table-names
+
+The table list that connector generated, used to simulate multi-table scenarios
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
@@ -421,6 +426,25 @@ FakeSource {
}
```
+Use table-names
+
+```hocon
+FakeSource {
+ table-names = ["test.table1", "test.table2"]
+ schema {
+ fields {
+ c_string = string
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ }
+ }
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertTableRule.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertTableRule.java
new file mode 100644
index 0000000000..02da760be7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertTableRule.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.rule;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class AssertTableRule implements Serializable {
+ private List<String> tableNames;
+}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 5bdf46cebe..97106bea02 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -31,6 +32,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertCatalogTableRule;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
+import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
@@ -39,6 +41,7 @@ import org.apache.commons.collections4.CollectionUtils;
import com.google.auto.service.AutoService;
import com.google.common.base.Throwables;
+import java.util.ArrayList;
import java.util.List;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
@@ -52,6 +55,7 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
private CatalogTable catalogTable;
private List<AssertFieldRule> assertFieldRules;
private List<AssertFieldRule.AssertRule> assertRowRules;
+ private AssertTableRule assertTableRule;
private AssertCatalogTableRule assertCatalogTableRule;
@@ -82,9 +86,17 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
assertCatalogTableRule.checkRule(catalogTable);
}
+ if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
+ assertTableRule =
+ new
AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
+ } else {
+ assertTableRule = new AssertTableRule(new ArrayList<>());
+ }
+
if (CollectionUtils.isEmpty(configList)
&& CollectionUtils.isEmpty(rowConfigList)
- && assertCatalogTableRule == null) {
+ && assertCatalogTableRule == null
+ && assertTableRule.getTableNames().isEmpty()) {
Throwables.propagateIfPossible(
new ConfigException.BadValue(
RULES.key(), "Assert rule config is empty, please
add rule config."));
@@ -103,7 +115,8 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
- return new AssertSinkWriter(seaTunnelRowType, assertFieldRules,
assertRowRules);
+ return new AssertSinkWriter(
+ seaTunnelRowType, assertFieldRules, assertRowRules,
assertTableRule);
}
@Override
@@ -123,7 +136,17 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
assertFieldRules = new AssertRuleParser().parseRules(configList);
}
- if (CollectionUtils.isEmpty(configList) &&
CollectionUtils.isEmpty(rowConfigList)) {
+ if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
+ assertTableRule =
+ new
AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
+ } else {
+ assertTableRule = new AssertTableRule(new ArrayList<>());
+ }
+
+ if (CollectionUtils.isEmpty(configList)
+ && CollectionUtils.isEmpty(rowConfigList)
+ && assertCatalogTableRule == null
+ && assertTableRule.getTableNames().isEmpty()) {
Throwables.propagateIfPossible(
new ConfigException.BadValue(
RULES.key(), "Assert rule config is empty, please
add rule config."));
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
index ee865ad9da..c0bc0d9718 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
@@ -23,10 +23,14 @@ import
org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecu
import
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.LongAccumulator;
public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -34,21 +38,26 @@ public class AssertSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private final SeaTunnelRowType seaTunnelRowType;
private final List<AssertFieldRule> assertFieldRules;
private final List<AssertFieldRule.AssertRule> assertRowRules;
+ private final AssertTableRule assertTableRule;
private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
private static final LongAccumulator LONG_ACCUMULATOR = new
LongAccumulator(Long::sum, 0);
+ private static final Set<String> TABLE_NAMES = new CopyOnWriteArraySet<>();
public AssertSinkWriter(
SeaTunnelRowType seaTunnelRowType,
List<AssertFieldRule> assertFieldRules,
- List<AssertFieldRule.AssertRule> assertRowRules) {
+ List<AssertFieldRule.AssertRule> assertRowRules,
+ AssertTableRule assertTableRule) {
this.seaTunnelRowType = seaTunnelRowType;
this.assertFieldRules = assertFieldRules;
this.assertRowRules = assertRowRules;
+ this.assertTableRule = assertTableRule;
}
@Override
public void write(SeaTunnelRow element) {
LONG_ACCUMULATOR.accumulate(1);
+ TABLE_NAMES.add(element.getTableId());
if (Objects.nonNull(assertFieldRules)) {
ASSERT_EXECUTOR
.fail(element, seaTunnelRowType, assertFieldRules)
@@ -89,5 +98,14 @@ public class AssertSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
+ failRule);
});
}
+ if (!assertTableRule.getTableNames().isEmpty()
+ && !new
HashSet<>(assertTableRule.getTableNames()).equals(TABLE_NAMES)) {
+ throw new AssertConnectorException(
+ AssertConnectorErrorCode.RULE_VALIDATION_FAILED,
+ "table names: "
+ + TABLE_NAMES
+ + " is not equal to "
+ + assertTableRule.getTableNames());
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index ef6ad79d37..f3aeb8d88e 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -20,6 +20,9 @@ package org.apache.seatunnel.connectors.seatunnel.fake.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
@@ -146,6 +149,8 @@ public class FakeConfig implements Serializable {
private List<RowData> fakeRows;
+ @Builder.Default private List<TableIdentifier> tableIdentifiers = new
ArrayList<>();
+
// todo: use ReadonlyConfig
public static FakeConfig buildWithConfig(Config config) {
FakeConfigBuilder builder = FakeConfig.builder();
@@ -407,6 +412,14 @@ public class FakeConfig implements Serializable {
builder.doubleFakeMode(
FakeOption.FakeMode.parse(config.getString(DOUBLE_FAKE_MODE.key())));
}
+ if (config.hasPath(CatalogOptions.TABLE_NAMES.key())) {
+ List<String> tableNames =
config.getStringList(CatalogOptions.TABLE_NAMES.key());
+ List<TableIdentifier> tableIdentifiers = new
ArrayList<>(tableNames.size());
+ for (String tableName : tableNames) {
+ tableIdentifiers.add(TableIdentifier.of("fake",
TablePath.of(tableName)));
+ }
+ builder.tableIdentifiers(tableIdentifiers);
+ }
return builder.build();
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index d643cf8d9d..90a33304e3 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -32,6 +32,8 @@ import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorExc
import
org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.commons.lang3.RandomUtils;
+
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
@@ -74,7 +76,16 @@ public class FakeDataGenerator {
for (SeaTunnelDataType<?> fieldType : fieldTypes) {
randomRow.add(randomColumnValue(fieldType));
}
- return new SeaTunnelRow(randomRow.toArray());
+ SeaTunnelRow row = new SeaTunnelRow(randomRow.toArray());
+ if (!fakeConfig.getTableIdentifiers().isEmpty()) {
+ row.setTableId(
+ fakeConfig
+ .getTableIdentifiers()
+ .get(RandomUtils.nextInt(0,
fakeConfig.getTableIdentifiers().size()))
+ .toTablePath()
+ .toString());
+ }
+ return row;
}
/**
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index c03f9f9725..15118d2bff 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -46,6 +46,7 @@ import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
@AutoService(SeaTunnelSource.class)
public class FakeSource
@@ -73,7 +74,20 @@ public class FakeSource
@Override
public List<CatalogTable> getProducedCatalogTables() {
- return Lists.newArrayList(catalogTable);
+ if (fakeConfig.getTableIdentifiers().isEmpty()) {
+ return Lists.newArrayList(catalogTable);
+ } else {
+ return fakeConfig.getTableIdentifiers().stream()
+ .map(
+ tableIdentifier ->
+ CatalogTable.of(
+ tableIdentifier,
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment()))
+ .collect(Collectors.toList());
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
index ee668fa929..91ef55950c 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -97,7 +98,8 @@ public class FakeSourceFactory implements TableSourceFactory {
DATE_DAY_TEMPLATE,
TIME_HOUR_TEMPLATE,
TIME_MINUTE_TEMPLATE,
- TIME_SECOND_TEMPLATE)
+ TIME_SECOND_TEMPLATE,
+ CatalogOptions.TABLE_NAMES)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithTableNamesTT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithTableNamesTT.java
new file mode 100644
index 0000000000..50e58b1ae3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithTableNamesTT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.fake;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support
CatalogTable")
+public class FakeWithTableNamesTT extends TestSuiteBase {
+ @TestTemplate
+ public void testFakeConnector(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult fakeWithTableNames =
+ container.executeJob("/fake_to_assert_with_tablenames.conf");
+ Assertions.assertEquals(0, fakeWithTableNames.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
new file mode 100644
index 0000000000..32fb751610
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_tablenames.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 100
+ table-names = ["test.table1", "test.table2"]
+ schema = {
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ {
+ name = name
+ type = string
+ }
+ {
+ name = age
+ type = int
+ }
+ ]
+ primaryKey = {
+ name = "primary key"
+ columnNames = ["id"]
+ }
+ constraintKeys = [
+ {
+ constraintName = "unique_name"
+ constraintType = UNIQUE_KEY
+ constraintColumns = [
+ {
+ columnName = "id"
+ sortType = ASC
+ }
+ ]
+ }
+ ]
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink{
+ Assert {
+ rules {
+ table-names = ["test.table1", "test.table2"]
+ }
+ }
+}
\ No newline at end of file