This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c4778a2497 [Feature][Connector-V2] Assert support multi-table check 
(#7687)
c4778a2497 is described below

commit c4778a249732ff14b481f800bed74d56793f27b9
Author: Jia Fan <[email protected]>
AuthorDate: Wed Sep 25 10:26:19 2024 +0800

    [Feature][Connector-V2] Assert support multi-table check (#7687)
---
 docs/en/connector-v2/sink/Assert.md                | 133 ++++++++++++--
 docs/{en => zh}/connector-v2/sink/Assert.md        | 196 ++++++++++++++++-----
 .../seatunnel/assertion/sink/AssertConfig.java     |  14 ++
 .../seatunnel/assertion/sink/AssertSink.java       |  81 ++++++---
 .../seatunnel/assertion/sink/AssertSinkWriter.java | 121 +++++++++----
 .../connector/assertion/FakeSourceToAssertIT.java  |  12 ++
 .../fakesource_to_multi_table_assert.conf          | 122 +++++++++++++
 7 files changed, 565 insertions(+), 114 deletions(-)

diff --git a/docs/en/connector-v2/sink/Assert.md 
b/docs/en/connector-v2/sink/Assert.md
index e1b93598a4..bc5b4c1bf3 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -12,7 +12,7 @@ A sink plugin which can assert illegal data by user defined 
rules
 
 ## Options
 
-|                                              Name                            
                  |                      Type                       | Required 
| Default |
+| Name                                                                         
                  | Type                                            | Required 
| Default |
 
|------------------------------------------------------------------------------------------------|-------------------------------------------------|----------|---------|
 | rules                                                                        
                  | ConfigMap                                       | yes      
| -       |
 | rules.field_rules                                                            
                  | string                                          | yes      
| -       |
@@ -43,6 +43,8 @@ A sink plugin which can assert illegal data by user defined 
rules
 | rules.catalog_table_rule.column_rule.default_value                           
                  | string                                          | no       
| -       |
 | rules.catalog_table_rule.column_rule.comment                                 
                  | comment                                         | no       
| -       |
 | rules.table-names                                                            
                  | ConfigList                                      | no       
| -       |
+| rules.tables_configs                                                         
                  | ConfigList                                      | no       
| -       |
+| rules.tables_configs.table_path                                              
                  | String                                          | no       
| -       |
 | common-options                                                               
                  |                                                 | no       
| -       |
 
 ### rules [ConfigMap]
@@ -97,12 +99,21 @@ Used to assert the catalog table is same with the user 
defined table.
 
 Used to assert the table should be in the data.
 
+### tables_configs [ConfigList]
+
+Used to assert the multiple tables should be in the data.
+
+### table_path [String]
+
+The path of the table.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details
 
 ## Example
 
+### Simple
 the whole config obey with `hocon` style
 
 ```hocon
@@ -191,6 +202,8 @@ Assert {
   }
 ```
 
+### Complex
+
 Here is a more complex example about `equals_to`. The example involves 
FakeSource. You may want to learn it, please read this 
[document](../source/FakeSource.md).
 
 ```hocon
@@ -481,18 +494,116 @@ sink{
 }
 ```
 
-## Changelog
+### Assert Multiple Tables 
+
+check multiple tables
 
-### 2.2.0-beta 2022-09-26
+```hocon
+env {
+  parallelism = 1
+  job.mode = BATCH
+}
 
-- Add Assert Sink Connector
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        row.num = 16
+        schema {
+          table = "test.table1"
+          fields {
+            c_int = int
+            c_bigint = bigint
+          }
+        }
+      },
+      {
+        row.num = 17
+        schema {
+          table = "test.table2"
+          fields {
+            c_string = string
+            c_tinyint = tinyint
+          }
+        }
+      }
+    ]
+  }
+}
 
-### 2.3.0-beta 2022-10-20
+transform {
+}
 
-- [Improve] 1.Support check the number of rows 
([2844](https://github.com/apache/seatunnel/pull/2844)) 
([3031](https://github.com/apache/seatunnel/pull/3031)):
-  - check rows not empty
-  - check minimum number of rows
-  - check maximum number of rows
-- [Improve] 2.Support direct define of data values(row) 
([2844](https://github.com/apache/seatunnel/pull/2844)) 
([3031](https://github.com/apache/seatunnel/pull/3031))
-- [Improve] 3.Support setting parallelism as 1 
([2844](https://github.com/apache/seatunnel/pull/2844)) 
([3031](https://github.com/apache/seatunnel/pull/3031))
+sink {
+  Assert {
+    rules =
+      {
+        tables_configs = [
+          {
+            table_path = "test.table1"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 16
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 16
+              }
+            ],
+            field_rules = [{
+              field_name = c_int
+              field_type = int
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }, {
+              field_name = c_bigint
+              field_type = bigint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }]
+          },
+          {
+            table_path = "test.table2"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 17
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 17
+              }
+            ],
+            field_rules = [{
+              field_name = c_string
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }, {
+              field_name = c_tinyint
+              field_type = tinyint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }]
+          }
+        ]
+
+      }
+  }
+}
+
+```
 
diff --git a/docs/en/connector-v2/sink/Assert.md 
b/docs/zh/connector-v2/sink/Assert.md
similarity index 76%
copy from docs/en/connector-v2/sink/Assert.md
copy to docs/zh/connector-v2/sink/Assert.md
index e1b93598a4..ca6d419920 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/zh/connector-v2/sink/Assert.md
@@ -1,18 +1,18 @@
 # Assert
 
-> Assert sink connector
+> Assert 数据接收器
 
-## Description
+## 描述
 
-A sink plugin which can assert illegal data by user defined rules
+Assert 数据接收器是一个用于断言数据是否符合用户定义规则的数据接收器。用户可以通过配置规则来断言数据是否符合预期,如果数据不符合规则,将会抛出异常。
 
-## Key Features
+## 核心特性
 
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [精准一次](../../concept/connector-v2-features.md)
 
-## Options
+## 配置
 
-|                                              Name                            
                  |                      Type                       | Required 
| Default |
+| Name                                                                         
                  | Type                                            | Required 
| Default |
 
|------------------------------------------------------------------------------------------------|-------------------------------------------------|----------|---------|
 | rules                                                                        
                  | ConfigMap                                       | yes      
| -       |
 | rules.field_rules                                                            
                  | string                                          | yes      
| -       |
@@ -43,67 +43,79 @@ A sink plugin which can assert illegal data by user defined 
rules
 | rules.catalog_table_rule.column_rule.default_value                           
                  | string                                          | no       
| -       |
 | rules.catalog_table_rule.column_rule.comment                                 
                  | comment                                         | no       
| -       |
 | rules.table-names                                                            
                  | ConfigList                                      | no       
| -       |
+| rules.tables_configs                                                         
                  | ConfigList                                      | no       
| -       |
+| rules.tables_configs.table_path                                              
                  | String                                          | no       
| -       |
 | common-options                                                               
                  |                                                 | no       
| -       |
 
 ### rules [ConfigMap]
 
-Rule definition of user's available data.  Each rule represents one field 
validation or row num validation.
+规则定义用户可用数据的规则。每个规则代表一个字段验证或行数量验证。
 
 ### field_rules [ConfigList]
 
-field rules for field validation
+字段规则用于字段验证
 
 ### field_name [string]
 
-field name(string)
+字段名
 
 ### field_type [string | ConfigMap]
 
-Field type declarations should adhere to this 
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
+字段类型。字段类型应符合此[指南](../../concept/schema-feature.md#如何声明支持的类型)。
 
 ### field_value [ConfigList]
 
-A list value rule define the data value validation
+字段值规则定义数据值验证
 
 ### rule_type [string]
 
-The following rules are supported for now
-- NOT_NULL `value can't be null`
-- NULL `value can be null`
-- MIN `define the minimum value of data`
-- MAX `define the maximum value of data`
-- MIN_LENGTH `define the minimum string length of a string data`
-- MAX_LENGTH `define the maximum string length of a string data`
-- MIN_ROW `define the minimun number of rows`
-- MAX_ROW `define the maximum number of rows`
+规则类型。目前支持以下规则
+- NOT_NULL `值不能为空`
+- NULL `值可以为空`
+- MIN `定义数据的最小值`
+- MAX `定义数据的最大值`
+- MIN_LENGTH `定义字符串数据的最小长度`
+- MAX_LENGTH `定义字符串数据的最大长度`
+- MIN_ROW `定义最小行数`
+- MAX_ROW `定义最大行数`
 
 ### rule_value [numeric]
 
-The value related to rule type. When the `rule_type` is `MIN`, `MAX`, 
`MIN_LENGTH`, `MAX_LENGTH`, `MIN_ROW` or `MAX_ROW`, users need to assign a 
value to the `rule_value`.
+与规则类型相关的值。当`rule_type`为`MIN`、`MAX`、`MIN_LENGTH`、`MAX_LENGTH`、`MIN_ROW`或`MAX_ROW`时,用户需要为`rule_value`分配一个值。
 
 ### equals_to [boolean | numeric | string | ConfigList | ConfigMap]
 
-`equals_to` is used to compare whether the field value is equal to the 
configured expected value. You can assign values of all types to `equals_to`. 
These types are detailed 
[here](../../concept/schema-feature.md#what-type-supported-at-now). For 
instance, if one field is a row with three fields, and the declaration of row 
type is `{a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = 
string}}`, users can assign the value `[["a", "b"], { k0 = 9999.99, k1 = 111.11 
}, [123, [...]
+`equals_to`用于比较字段值是否等于配置的预期值。用户可以将所有类型的值分配给`equals_to`。这些类型在[这里](../../concept/schema-feature.md#目前支持哪些类型)有详细说明。
+例如,如果一个字段是一个包含三个字段的行,行类型的声明是`{a = array<string>, b = map<string, decimal(30, 
2)>, c={c_0 = int, b = string}}`,用户可以将值`[["a", "b"], { k0 = 9999.99, k1 = 
111.11 }, [123, "abcd"]]`分配给`equals_to`。
 
-> The way of defining field values is consistent with 
[FakeSource](../source/FakeSource.md#customize-the-data-content-simple).
->
-> `equals_to` cannot be applied to `null` type fields. However, users can use 
the rule type `NULL` for verification, such as `{rule_type = NULL}`.
+> 
定义字段值的方式与[FakeSource](../../../en/connector-v2/source/FakeSource.md#customize-the-data-content-simple)一致。
+> 
+> `equals_to`不能应用于`null`类型字段。但是,用户可以使用规则类型`NULL`进行验证,例如`{rule_type = NULL}`。
 
 ### catalog_table_rule [ConfigMap]
 
-Used to assert the catalog table is same with the user defined table.
+catalog_table_rule用于断言Catalog表是否与用户定义的表相同。
 
 ### table-names [ConfigList]
 
-Used to assert the table should be in the data.
+用于断言表是否在数据中。
+
+### tables_configs [ConfigList]
+
+用于断言多个表是否在数据中。
+
+### table_path [String]
+
+表的路径。
 
 ### common options
 
-Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details
+Sink 插件的通用参数,请参考 [Sink Common Options](../sink-common-options.md) 了解详情
 
-## Example
+## 示例
 
-the whole config obey with `hocon` style
+### 简单
+整个Config遵循`hocon`风格
 
 ```hocon
 Assert {
@@ -191,7 +203,9 @@ Assert {
   }
 ```
 
-Here is a more complex example about `equals_to`. The example involves 
FakeSource. You may want to learn it, please read this 
[document](../source/FakeSource.md).
+### 复杂
+
+这里有一个更复杂的例子,涉及到`equals_to`。
 
 ```hocon
 source {
@@ -481,18 +495,116 @@ sink{
 }
 ```
 
-## Changelog
+### 验证多表
+
+验证多个表
 
-### 2.2.0-beta 2022-09-26
+```hocon
+env {
+  parallelism = 1
+  job.mode = BATCH
+}
 
-- Add Assert Sink Connector
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        row.num = 16
+        schema {
+          table = "test.table1"
+          fields {
+            c_int = int
+            c_bigint = bigint
+          }
+        }
+      },
+      {
+        row.num = 17
+        schema {
+          table = "test.table2"
+          fields {
+            c_string = string
+            c_tinyint = tinyint
+          }
+        }
+      }
+    ]
+  }
+}
 
-### 2.3.0-beta 2022-10-20
+transform {
+}
 
-- [Improve] 1.Support check the number of rows 
([2844](https://github.com/apache/seatunnel/pull/2844)) 
([3031](https://github.com/apache/seatunnel/pull/3031)):
-  - check rows not empty
-  - check minimum number of rows
-  - check maximum number of rows
-- [Improve] 2.Support direct define of data values(row) 
([2844](https://github.com/apache/seatunnel/pull/2844)) 
([3031](https://github.com/apache/seatunnel/pull/3031))
-- [Improve] 3.Support setting parallelism as 1 
([2844](https://github.com/apache/seatunnel/pull/2844)) 
([3031](https://github.com/apache/seatunnel/pull/3031))
+sink {
+  Assert {
+    rules =
+      {
+        tables_configs = [
+          {
+            table_path = "test.table1"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 16
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 16
+              }
+            ],
+            field_rules = [{
+              field_name = c_int
+              field_type = int
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }, {
+              field_name = c_bigint
+              field_type = bigint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }]
+          },
+          {
+            table_path = "test.table2"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 17
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 17
+              }
+            ],
+            field_rules = [{
+              field_name = c_string
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }, {
+              field_name = c_tinyint
+              field_type = tinyint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }]
+          }
+        ]
+
+      }
+  }
+}
+
+```
 
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
index d86c034d68..d9fcea69ae 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.util.List;
 import java.util.Map;
 
 public class AssertConfig {
@@ -83,4 +84,17 @@ public class AssertConfig {
                     .noDefaultValue()
                     .withDescription(
                             "Rule definition of user's available data. Each 
rule represents one field validation or row num validation.");
+
+    public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
+            Options.key("tables_configs")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table configuration for the sink. Each table 
configuration contains the table name and the rules for the table.");
+
+    public static final Option<String> TABLE_PATH =
+            Options.key("table_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("table full path");
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 931555857d..927943adc2 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -34,49 +34,49 @@ import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 
-import org.apache.commons.collections4.CollectionUtils;
-
 import com.google.common.base.Throwables;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_CONFIGS;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_PATH;
 
 public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
-    private SeaTunnelRowType seaTunnelRowType;
-    private List<AssertFieldRule> assertFieldRules;
-    private List<AssertFieldRule.AssertRule> assertRowRules;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Map<String, List<AssertFieldRule>> assertFieldRules;
+    private final Map<String, List<AssertFieldRule.AssertRule>> assertRowRules;
     private final AssertTableRule assertTableRule;
-    private AssertCatalogTableRule assertCatalogTableRule;
+    private final Map<String, AssertCatalogTableRule> assertCatalogTableRule;
+    private final String catalogTableName;
 
     public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
         this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
         if (!pluginConfig.getOptional(RULES).isPresent()) {
-            Throwables.propagateIfPossible(new 
ConfigException.Missing(RULES.key()));
+            Throwables.throwIfUnchecked(new 
ConfigException.Missing(RULES.key()));
         }
+        assertFieldRules = new ConcurrentHashMap<>();
+        assertRowRules = new ConcurrentHashMap<>();
+        assertCatalogTableRule = new ConcurrentHashMap<>();
         Config ruleConfig = ConfigFactory.parseMap(pluginConfig.get(RULES));
-        List<? extends Config> rowConfigList = null;
-        List<? extends Config> configList = null;
-        if (ruleConfig.hasPath(ROW_RULES)) {
-            rowConfigList = ruleConfig.getConfigList(ROW_RULES);
-            assertRowRules = new 
AssertRuleParser().parseRowRules(rowConfigList);
-        }
-        if (ruleConfig.hasPath(FIELD_RULES)) {
-            configList = ruleConfig.getConfigList(FIELD_RULES);
-            assertFieldRules = new AssertRuleParser().parseRules(configList);
-        }
-
-        if (ruleConfig.hasPath(CATALOG_TABLE_RULES)) {
-            assertCatalogTableRule =
-                    new AssertRuleParser()
-                            
.parseCatalogTableRule(ruleConfig.getConfig(CATALOG_TABLE_RULES));
-            assertCatalogTableRule.checkRule(catalogTable);
+        if (ruleConfig.hasPath(TABLE_CONFIGS.key())) {
+            List<? extends Config> tableConfigs = 
ruleConfig.getConfigList(TABLE_CONFIGS.key());
+            for (Config tableConfig : tableConfigs) {
+                String tableName = tableConfig.getString(TABLE_PATH.key());
+                initTableRule(catalogTable, tableConfig, tableName);
+            }
+        } else {
+            String tableName = catalogTable.getTablePath().getFullName();
+            initTableRule(catalogTable, ruleConfig, tableName);
         }
+        catalogTableName = catalogTable.getTablePath().getFullName();
 
         if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
             assertTableRule =
@@ -85,20 +85,45 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
             assertTableRule = new AssertTableRule(new ArrayList<>());
         }
 
-        if (CollectionUtils.isEmpty(configList)
-                && CollectionUtils.isEmpty(rowConfigList)
-                && assertCatalogTableRule == null
+        if (assertRowRules.isEmpty()
+                && assertFieldRules.isEmpty()
+                && assertCatalogTableRule.isEmpty()
                 && assertTableRule.getTableNames().isEmpty()) {
-            Throwables.propagateIfPossible(
+            Throwables.throwIfUnchecked(
                     new ConfigException.BadValue(
                             RULES.key(), "Assert rule config is empty, please 
add rule config."));
         }
     }
 
+    private void initTableRule(CatalogTable catalogTable, Config tableConfig, 
String tableName) {
+        List<? extends Config> rowConfigList;
+        List<? extends Config> configList;
+        if (tableConfig.hasPath(ROW_RULES)) {
+            rowConfigList = tableConfig.getConfigList(ROW_RULES);
+            assertRowRules.put(tableName, new 
AssertRuleParser().parseRowRules(rowConfigList));
+        }
+        if (tableConfig.hasPath(FIELD_RULES)) {
+            configList = tableConfig.getConfigList(FIELD_RULES);
+            assertFieldRules.put(tableName, new 
AssertRuleParser().parseRules(configList));
+        }
+
+        if (tableConfig.hasPath(CATALOG_TABLE_RULES)) {
+            AssertCatalogTableRule catalogTableRule =
+                    new AssertRuleParser()
+                            
.parseCatalogTableRule(tableConfig.getConfig(CATALOG_TABLE_RULES));
+            catalogTableRule.checkRule(catalogTable);
+            assertCatalogTableRule.put(tableName, catalogTableRule);
+        }
+    }
+
     @Override
     public AssertSinkWriter createWriter(SinkWriter.Context context) {
         return new AssertSinkWriter(
-                seaTunnelRowType, assertFieldRules, assertRowRules, 
assertTableRule);
+                seaTunnelRowType,
+                assertFieldRules,
+                assertRowRules,
+                assertTableRule,
+                catalogTableName);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
index 7d7968ad00..54e999bb0e 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
@@ -27,10 +27,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.LongAccumulator;
 
@@ -38,31 +42,55 @@ public class AssertSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         implements SupportMultiTableSinkWriter<Void> {
 
     private final SeaTunnelRowType seaTunnelRowType;
-    private final List<AssertFieldRule> assertFieldRules;
-    private final List<AssertFieldRule.AssertRule> assertRowRules;
+    private final Map<String, List<AssertFieldRule>> assertFieldRules;
+    private final Map<String, List<AssertFieldRule.AssertRule>> assertRowRules;
     private final AssertTableRule assertTableRule;
     private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
-    private static final LongAccumulator LONG_ACCUMULATOR = new 
LongAccumulator(Long::sum, 0);
+    private static final Map<String, LongAccumulator> LONG_ACCUMULATOR = new 
ConcurrentHashMap<>();
     private static final Set<String> TABLE_NAMES = new CopyOnWriteArraySet<>();
+    private final String catalogTableName;
 
     public AssertSinkWriter(
             SeaTunnelRowType seaTunnelRowType,
-            List<AssertFieldRule> assertFieldRules,
-            List<AssertFieldRule.AssertRule> assertRowRules,
-            AssertTableRule assertTableRule) {
+            Map<String, List<AssertFieldRule>> assertFieldRules,
+            Map<String, List<AssertFieldRule.AssertRule>> assertRowRules,
+            AssertTableRule assertTableRule,
+            String catalogTableName) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.assertFieldRules = assertFieldRules;
         this.assertRowRules = assertRowRules;
         this.assertTableRule = assertTableRule;
+        this.catalogTableName = catalogTableName;
     }
 
     @Override
     public void write(SeaTunnelRow element) {
-        LONG_ACCUMULATOR.accumulate(1);
         TABLE_NAMES.add(element.getTableId());
-        if (Objects.nonNull(assertFieldRules)) {
+        List<AssertFieldRule> assertFieldRule = null;
+        String tableName = null;
+        if (assertFieldRules.size() == 1) {
+            assertFieldRule = assertFieldRules.values().iterator().next();
+        }
+        if (assertRowRules.size() == 1) {
+            tableName = assertRowRules.keySet().iterator().next();
+        }
+
+        if (StringUtils.isEmpty(tableName) && 
StringUtils.isNotEmpty(element.getTableId())) {
+            tableName = element.getTableId();
+        } else {
+            tableName = catalogTableName;
+        }
+
+        if (Objects.isNull(assertFieldRule)) {
+            assertFieldRule = assertFieldRules.get(tableName);
+        }
+
+        LONG_ACCUMULATOR
+                .computeIfAbsent(tableName, (k) -> new 
LongAccumulator(Long::sum, 0))
+                .accumulate(1);
+        if (Objects.nonNull(assertFieldRule)) {
             ASSERT_EXECUTOR
-                    .fail(element, seaTunnelRowType, assertFieldRules)
+                    .fail(element, seaTunnelRowType, assertFieldRule)
                     .ifPresent(
                             failRule -> {
                                 throw new AssertConnectorException(
@@ -74,30 +102,57 @@ public class AssertSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
 
     @Override
     public void close() {
-        if (Objects.nonNull(assertRowRules)) {
-            assertRowRules.stream()
-                    .filter(
-                            assertRule -> {
-                                switch (assertRule.getRuleType()) {
-                                    case MAX_ROW:
-                                        return !(LONG_ACCUMULATOR.longValue()
-                                                <= assertRule.getRuleValue());
-                                    case MIN_ROW:
-                                        return !(LONG_ACCUMULATOR.longValue()
-                                                >= assertRule.getRuleValue());
-                                    default:
-                                        return false;
-                                }
-                            })
-                    .findFirst()
-                    .ifPresent(
-                            failRule -> {
-                                throw new AssertConnectorException(
-                                        
AssertConnectorErrorCode.RULE_VALIDATION_FAILED,
-                                        "row num :"
-                                                + LONG_ACCUMULATOR.longValue()
-                                                + " fail rule: "
-                                                + failRule);
+        if (!assertRowRules.isEmpty()) {
+            assertRowRules.entrySet().stream()
+                    .filter(entry -> !entry.getValue().isEmpty())
+                    .forEach(
+                            entry -> {
+                                List<AssertFieldRule.AssertRule> assertRules = 
entry.getValue();
+                                assertRules.stream()
+                                        .filter(
+                                                assertRule -> {
+                                                    long count;
+                                                    if 
(LONG_ACCUMULATOR.containsKey(
+                                                            entry.getKey())) {
+                                                        count =
+                                                                
LONG_ACCUMULATOR
+                                                                        
.get(entry.getKey())
+                                                                        
.longValue();
+                                                    } else {
+                                                        count = 0;
+                                                    }
+                                                    switch 
(assertRule.getRuleType()) {
+                                                        case MAX_ROW:
+                                                            return !(count
+                                                                    <= 
assertRule.getRuleValue());
+                                                        case MIN_ROW:
+                                                            return !(count
+                                                                    >= 
assertRule.getRuleValue());
+                                                        default:
+                                                            return false;
+                                                    }
+                                                })
+                                        .findFirst()
+                                        .ifPresent(
+                                                failRule -> {
+                                                    long count;
+                                                    if 
(LONG_ACCUMULATOR.containsKey(
+                                                            entry.getKey())) {
+                                                        count =
+                                                                
LONG_ACCUMULATOR
+                                                                        
.get(entry.getKey())
+                                                                        
.longValue();
+                                                    } else {
+                                                        count = 0;
+                                                    }
+                                                    throw new 
AssertConnectorException(
+                                                            
AssertConnectorErrorCode
+                                                                    
.RULE_VALIDATION_FAILED,
+                                                            "row num :"
+                                                                    + count
+                                                                    + " fail 
rule: "
+                                                                    + 
failRule);
+                                                });
                             });
         }
         if (!assertTableRule.getTableNames().isEmpty()
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
index 277d048a6d..f366f69b6e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
@@ -46,6 +46,18 @@ public class FakeSourceToAssertIT extends TestSuiteBase {
         Assertions.assertEquals(0, execResult.getExitCode());
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK},
+            disabledReason = "Currently FLINK unsupported multi table")
+    public void testFakeSourceToMultiAssertSink(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/assertion/fakesource_to_multi_table_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_multi_table_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_multi_table_assert.conf
new file mode 100644
index 0000000000..e3077cf387
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_multi_table_assert.conf
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = BATCH
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        row.num = 16
+        schema {
+          table = "test.table1"
+          fields {
+            c_int = int
+            c_bigint = bigint
+          }
+        }
+      },
+      {
+        row.num = 17
+        schema {
+          table = "test.table2"
+          fields {
+            c_string = string
+            c_tinyint = tinyint
+          }
+        }
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules =
+      {
+        tables_configs = [
+          {
+            table_path = "test.table1"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 16
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 16
+              }
+            ],
+            field_rules = [{
+              field_name = c_int
+              field_type = int
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }, {
+              field_name = c_bigint
+              field_type = bigint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }]
+          },
+          {
+            table_path = "test.table2"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 17
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 17
+              }
+            ],
+            field_rules = [{
+              field_name = c_string
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }, {
+              field_name = c_tinyint
+              field_type = tinyint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }]
+          }
+        ]
+
+      }
+  }
+}


Reply via email to