This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c4778a2497 [Feature][Connector-V2] Assert support multi-table check
(#7687)
c4778a2497 is described below
commit c4778a249732ff14b481f800bed74d56793f27b9
Author: Jia Fan <[email protected]>
AuthorDate: Wed Sep 25 10:26:19 2024 +0800
[Feature][Connector-V2] Assert support multi-table check (#7687)
---
docs/en/connector-v2/sink/Assert.md | 133 ++++++++++++--
docs/{en => zh}/connector-v2/sink/Assert.md | 196 ++++++++++++++++-----
.../seatunnel/assertion/sink/AssertConfig.java | 14 ++
.../seatunnel/assertion/sink/AssertSink.java | 81 ++++++---
.../seatunnel/assertion/sink/AssertSinkWriter.java | 121 +++++++++----
.../connector/assertion/FakeSourceToAssertIT.java | 12 ++
.../fakesource_to_multi_table_assert.conf | 122 +++++++++++++
7 files changed, 565 insertions(+), 114 deletions(-)
diff --git a/docs/en/connector-v2/sink/Assert.md
b/docs/en/connector-v2/sink/Assert.md
index e1b93598a4..bc5b4c1bf3 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -12,7 +12,7 @@ A sink plugin which can assert illegal data by user defined
rules
## Options
-| Name
| Type | Required
| Default |
+| Name
| Type | Required
| Default |
|------------------------------------------------------------------------------------------------|-------------------------------------------------|----------|---------|
| rules
| ConfigMap | yes
| - |
| rules.field_rules
| string | yes
| - |
@@ -43,6 +43,8 @@ A sink plugin which can assert illegal data by user defined
rules
| rules.catalog_table_rule.column_rule.default_value
| string | no
| - |
| rules.catalog_table_rule.column_rule.comment
| comment | no
| - |
| rules.table-names
| ConfigList | no
| - |
+| rules.tables_configs
| ConfigList | no
| - |
+| rules.tables_configs.table_path
| String | no
| - |
| common-options
| | no
| - |
### rules [ConfigMap]
@@ -97,12 +99,21 @@ Used to assert the catalog table is same with the user
defined table.
Used to assert the table should be in the data.
+### tables_configs [ConfigList]
+
+Used to assert the multiple tables should be in the data.
+
+### table_path [String]
+
+The path of the table.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details
## Example
+### Simple
the whole config obey with `hocon` style
```hocon
@@ -191,6 +202,8 @@ Assert {
}
```
+### Complex
+
Here is a more complex example about `equals_to`. The example involves
FakeSource. You may want to learn it, please read this
[document](../source/FakeSource.md).
```hocon
@@ -481,18 +494,116 @@ sink{
}
```
-## Changelog
+### Assert Multiple Tables
+
+check multiple tables
-### 2.2.0-beta 2022-09-26
+```hocon
+env {
+ parallelism = 1
+ job.mode = BATCH
+}
-- Add Assert Sink Connector
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 16
+ schema {
+ table = "test.table1"
+ fields {
+ c_int = int
+ c_bigint = bigint
+ }
+ }
+ },
+ {
+ row.num = 17
+ schema {
+ table = "test.table2"
+ fields {
+ c_string = string
+ c_tinyint = tinyint
+ }
+ }
+ }
+ ]
+ }
+}
-### 2.3.0-beta 2022-10-20
+transform {
+}
-- [Improve] 1.Support check the number of rows
([2844](https://github.com/apache/seatunnel/pull/2844))
([3031](https://github.com/apache/seatunnel/pull/3031)):
- - check rows not empty
- - check minimum number of rows
- - check maximum number of rows
-- [Improve] 2.Support direct define of data values(row)
([2844](https://github.com/apache/seatunnel/pull/2844))
([3031](https://github.com/apache/seatunnel/pull/3031))
-- [Improve] 3.Support setting parallelism as 1
([2844](https://github.com/apache/seatunnel/pull/2844))
([3031](https://github.com/apache/seatunnel/pull/3031))
+sink {
+ Assert {
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.table1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 16
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 16
+ }
+ ],
+ field_rules = [{
+ field_name = c_int
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = c_bigint
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }]
+ },
+ {
+ table_path = "test.table2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 17
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 17
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }]
+ }
+ ]
+
+ }
+ }
+}
+
+```
diff --git a/docs/en/connector-v2/sink/Assert.md
b/docs/zh/connector-v2/sink/Assert.md
similarity index 76%
copy from docs/en/connector-v2/sink/Assert.md
copy to docs/zh/connector-v2/sink/Assert.md
index e1b93598a4..ca6d419920 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/zh/connector-v2/sink/Assert.md
@@ -1,18 +1,18 @@
# Assert
-> Assert sink connector
+> Assert 数据接收器
-## Description
+## 描述
-A sink plugin which can assert illegal data by user defined rules
+Assert 数据接收器是一个用于断言数据是否符合用户定义规则的数据接收器。用户可以通过配置规则来断言数据是否符合预期,如果数据不符合规则,将会抛出异常。
-## Key Features
+## 核心特性
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [精准一次](../../concept/connector-v2-features.md)
-## Options
+## 配置
-| Name
| Type | Required
| Default |
+| Name
| Type | Required
| Default |
|------------------------------------------------------------------------------------------------|-------------------------------------------------|----------|---------|
| rules
| ConfigMap | yes
| - |
| rules.field_rules
| string | yes
| - |
@@ -43,67 +43,79 @@ A sink plugin which can assert illegal data by user defined
rules
| rules.catalog_table_rule.column_rule.default_value
| string | no
| - |
| rules.catalog_table_rule.column_rule.comment
| comment | no
| - |
| rules.table-names
| ConfigList | no
| - |
+| rules.tables_configs
| ConfigList | no
| - |
+| rules.tables_configs.table_path
| String | no
| - |
| common-options
| | no
| - |
### rules [ConfigMap]
-Rule definition of user's available data. Each rule represents one field
validation or row num validation.
+规则定义用户可用数据的规则。每个规则代表一个字段验证或行数量验证。
### field_rules [ConfigList]
-field rules for field validation
+字段规则用于字段验证
### field_name [string]
-field name(string)
+字段名
### field_type [string | ConfigMap]
-Field type declarations should adhere to this
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
+字段类型。字段类型应符合此[指南](../../concept/schema-feature.md#如何声明支持的类型)。
### field_value [ConfigList]
-A list value rule define the data value validation
+字段值规则定义数据值验证
### rule_type [string]
-The following rules are supported for now
-- NOT_NULL `value can't be null`
-- NULL `value can be null`
-- MIN `define the minimum value of data`
-- MAX `define the maximum value of data`
-- MIN_LENGTH `define the minimum string length of a string data`
-- MAX_LENGTH `define the maximum string length of a string data`
-- MIN_ROW `define the minimun number of rows`
-- MAX_ROW `define the maximum number of rows`
+规则类型。目前支持以下规则
+- NOT_NULL `值不能为空`
+- NULL `值可以为空`
+- MIN `定义数据的最小值`
+- MAX `定义数据的最大值`
+- MIN_LENGTH `定义字符串数据的最小长度`
+- MAX_LENGTH `定义字符串数据的最大长度`
+- MIN_ROW `定义最小行数`
+- MAX_ROW `定义最大行数`
### rule_value [numeric]
-The value related to rule type. When the `rule_type` is `MIN`, `MAX`,
`MIN_LENGTH`, `MAX_LENGTH`, `MIN_ROW` or `MAX_ROW`, users need to assign a
value to the `rule_value`.
+与规则类型相关的值。当`rule_type`为`MIN`、`MAX`、`MIN_LENGTH`、`MAX_LENGTH`、`MIN_ROW`或`MAX_ROW`时,用户需要为`rule_value`分配一个值。
### equals_to [boolean | numeric | string | ConfigList | ConfigMap]
-`equals_to` is used to compare whether the field value is equal to the
configured expected value. You can assign values of all types to `equals_to`.
These types are detailed
[here](../../concept/schema-feature.md#what-type-supported-at-now). For
instance, if one field is a row with three fields, and the declaration of row
type is `{a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b =
string}}`, users can assign the value `[["a", "b"], { k0 = 9999.99, k1 = 111.11
}, [123, [...]
+`equals_to`用于比较字段值是否等于配置的预期值。用户可以将所有类型的值分配给`equals_to`。这些类型在[这里](../../concept/schema-feature.md#目前支持哪些类型)有详细说明。
+例如,如果一个字段是一个包含三个字段的行,行类型的声明是`{a = array<string>, b = map<string, decimal(30,
2)>, c={c_0 = int, b = string}}`,用户可以将值`[["a", "b"], { k0 = 9999.99, k1 =
111.11 }, [123, "abcd"]]`分配给`equals_to`。
-> The way of defining field values is consistent with
[FakeSource](../source/FakeSource.md#customize-the-data-content-simple).
->
-> `equals_to` cannot be applied to `null` type fields. However, users can use
the rule type `NULL` for verification, such as `{rule_type = NULL}`.
+>
定义字段值的方式与[FakeSource](../../../en/connector-v2/source/FakeSource.md#customize-the-data-content-simple)一致。
+>
+> `equals_to`不能应用于`null`类型字段。但是,用户可以使用规则类型`NULL`进行验证,例如`{rule_type = NULL}`。
### catalog_table_rule [ConfigMap]
-Used to assert the catalog table is same with the user defined table.
+catalog_table_rule用于断言Catalog表是否与用户定义的表相同。
### table-names [ConfigList]
-Used to assert the table should be in the data.
+用于断言表是否在数据中。
+
+### tables_configs [ConfigList]
+
+用于断言多个表是否在数据中。
+
+### table_path [String]
+
+表的路径。
### common options
-Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details
+Sink 插件的通用参数,请参考 [Sink Common Options](../sink-common-options.md) 了解详情
-## Example
+## 示例
-the whole config obey with `hocon` style
+### 简单
+整个Config遵循`hocon`风格
```hocon
Assert {
@@ -191,7 +203,9 @@ Assert {
}
```
-Here is a more complex example about `equals_to`. The example involves
FakeSource. You may want to learn it, please read this
[document](../source/FakeSource.md).
+### 复杂
+
+这里有一个更复杂的例子,涉及到`equals_to`。
```hocon
source {
@@ -481,18 +495,116 @@ sink{
}
```
-## Changelog
+### 验证多表
+
+验证多个表
-### 2.2.0-beta 2022-09-26
+```hocon
+env {
+ parallelism = 1
+ job.mode = BATCH
+}
-- Add Assert Sink Connector
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 16
+ schema {
+ table = "test.table1"
+ fields {
+ c_int = int
+ c_bigint = bigint
+ }
+ }
+ },
+ {
+ row.num = 17
+ schema {
+ table = "test.table2"
+ fields {
+ c_string = string
+ c_tinyint = tinyint
+ }
+ }
+ }
+ ]
+ }
+}
-### 2.3.0-beta 2022-10-20
+transform {
+}
-- [Improve] 1.Support check the number of rows
([2844](https://github.com/apache/seatunnel/pull/2844))
([3031](https://github.com/apache/seatunnel/pull/3031)):
- - check rows not empty
- - check minimum number of rows
- - check maximum number of rows
-- [Improve] 2.Support direct define of data values(row)
([2844](https://github.com/apache/seatunnel/pull/2844))
([3031](https://github.com/apache/seatunnel/pull/3031))
-- [Improve] 3.Support setting parallelism as 1
([2844](https://github.com/apache/seatunnel/pull/2844))
([3031](https://github.com/apache/seatunnel/pull/3031))
+sink {
+ Assert {
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.table1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 16
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 16
+ }
+ ],
+ field_rules = [{
+ field_name = c_int
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = c_bigint
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }]
+ },
+ {
+ table_path = "test.table2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 17
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 17
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }]
+ }
+ ]
+
+ }
+ }
+}
+
+```
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
index d86c034d68..d9fcea69ae 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.util.List;
import java.util.Map;
public class AssertConfig {
@@ -83,4 +84,17 @@ public class AssertConfig {
.noDefaultValue()
.withDescription(
"Rule definition of user's available data. Each
rule represents one field validation or row num validation.");
+
+ public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
+ Options.key("tables_configs")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription(
+ "Table configuration for the sink. Each table
configuration contains the table name and the rules for the table.");
+
+ public static final Option<String> TABLE_PATH =
+ Options.key("table_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("table full path");
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 931555857d..927943adc2 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -34,49 +34,49 @@ import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import org.apache.commons.collections4.CollectionUtils;
-
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_CONFIGS;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_PATH;
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
- private SeaTunnelRowType seaTunnelRowType;
- private List<AssertFieldRule> assertFieldRules;
- private List<AssertFieldRule.AssertRule> assertRowRules;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final Map<String, List<AssertFieldRule>> assertFieldRules;
+ private final Map<String, List<AssertFieldRule.AssertRule>> assertRowRules;
private final AssertTableRule assertTableRule;
- private AssertCatalogTableRule assertCatalogTableRule;
+ private final Map<String, AssertCatalogTableRule> assertCatalogTableRule;
+ private final String catalogTableName;
public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
if (!pluginConfig.getOptional(RULES).isPresent()) {
- Throwables.propagateIfPossible(new
ConfigException.Missing(RULES.key()));
+ Throwables.throwIfUnchecked(new
ConfigException.Missing(RULES.key()));
}
+ assertFieldRules = new ConcurrentHashMap<>();
+ assertRowRules = new ConcurrentHashMap<>();
+ assertCatalogTableRule = new ConcurrentHashMap<>();
Config ruleConfig = ConfigFactory.parseMap(pluginConfig.get(RULES));
- List<? extends Config> rowConfigList = null;
- List<? extends Config> configList = null;
- if (ruleConfig.hasPath(ROW_RULES)) {
- rowConfigList = ruleConfig.getConfigList(ROW_RULES);
- assertRowRules = new
AssertRuleParser().parseRowRules(rowConfigList);
- }
- if (ruleConfig.hasPath(FIELD_RULES)) {
- configList = ruleConfig.getConfigList(FIELD_RULES);
- assertFieldRules = new AssertRuleParser().parseRules(configList);
- }
-
- if (ruleConfig.hasPath(CATALOG_TABLE_RULES)) {
- assertCatalogTableRule =
- new AssertRuleParser()
-
.parseCatalogTableRule(ruleConfig.getConfig(CATALOG_TABLE_RULES));
- assertCatalogTableRule.checkRule(catalogTable);
+ if (ruleConfig.hasPath(TABLE_CONFIGS.key())) {
+ List<? extends Config> tableConfigs =
ruleConfig.getConfigList(TABLE_CONFIGS.key());
+ for (Config tableConfig : tableConfigs) {
+ String tableName = tableConfig.getString(TABLE_PATH.key());
+ initTableRule(catalogTable, tableConfig, tableName);
+ }
+ } else {
+ String tableName = catalogTable.getTablePath().getFullName();
+ initTableRule(catalogTable, ruleConfig, tableName);
}
+ catalogTableName = catalogTable.getTablePath().getFullName();
if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
assertTableRule =
@@ -85,20 +85,45 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
assertTableRule = new AssertTableRule(new ArrayList<>());
}
- if (CollectionUtils.isEmpty(configList)
- && CollectionUtils.isEmpty(rowConfigList)
- && assertCatalogTableRule == null
+ if (assertRowRules.isEmpty()
+ && assertFieldRules.isEmpty()
+ && assertCatalogTableRule.isEmpty()
&& assertTableRule.getTableNames().isEmpty()) {
- Throwables.propagateIfPossible(
+ Throwables.throwIfUnchecked(
new ConfigException.BadValue(
RULES.key(), "Assert rule config is empty, please
add rule config."));
}
}
+ private void initTableRule(CatalogTable catalogTable, Config tableConfig,
String tableName) {
+ List<? extends Config> rowConfigList;
+ List<? extends Config> configList;
+ if (tableConfig.hasPath(ROW_RULES)) {
+ rowConfigList = tableConfig.getConfigList(ROW_RULES);
+ assertRowRules.put(tableName, new
AssertRuleParser().parseRowRules(rowConfigList));
+ }
+ if (tableConfig.hasPath(FIELD_RULES)) {
+ configList = tableConfig.getConfigList(FIELD_RULES);
+ assertFieldRules.put(tableName, new
AssertRuleParser().parseRules(configList));
+ }
+
+ if (tableConfig.hasPath(CATALOG_TABLE_RULES)) {
+ AssertCatalogTableRule catalogTableRule =
+ new AssertRuleParser()
+
.parseCatalogTableRule(tableConfig.getConfig(CATALOG_TABLE_RULES));
+ catalogTableRule.checkRule(catalogTable);
+ assertCatalogTableRule.put(tableName, catalogTableRule);
+ }
+ }
+
@Override
public AssertSinkWriter createWriter(SinkWriter.Context context) {
return new AssertSinkWriter(
- seaTunnelRowType, assertFieldRules, assertRowRules,
assertTableRule);
+ seaTunnelRowType,
+ assertFieldRules,
+ assertRowRules,
+ assertTableRule,
+ catalogTableName);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
index 7d7968ad00..54e999bb0e 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
@@ -27,10 +27,14 @@ import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.LongAccumulator;
@@ -38,31 +42,55 @@ public class AssertSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
private final SeaTunnelRowType seaTunnelRowType;
- private final List<AssertFieldRule> assertFieldRules;
- private final List<AssertFieldRule.AssertRule> assertRowRules;
+ private final Map<String, List<AssertFieldRule>> assertFieldRules;
+ private final Map<String, List<AssertFieldRule.AssertRule>> assertRowRules;
private final AssertTableRule assertTableRule;
private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
- private static final LongAccumulator LONG_ACCUMULATOR = new
LongAccumulator(Long::sum, 0);
+ private static final Map<String, LongAccumulator> LONG_ACCUMULATOR = new
ConcurrentHashMap<>();
private static final Set<String> TABLE_NAMES = new CopyOnWriteArraySet<>();
+ private final String catalogTableName;
public AssertSinkWriter(
SeaTunnelRowType seaTunnelRowType,
- List<AssertFieldRule> assertFieldRules,
- List<AssertFieldRule.AssertRule> assertRowRules,
- AssertTableRule assertTableRule) {
+ Map<String, List<AssertFieldRule>> assertFieldRules,
+ Map<String, List<AssertFieldRule.AssertRule>> assertRowRules,
+ AssertTableRule assertTableRule,
+ String catalogTableName) {
this.seaTunnelRowType = seaTunnelRowType;
this.assertFieldRules = assertFieldRules;
this.assertRowRules = assertRowRules;
this.assertTableRule = assertTableRule;
+ this.catalogTableName = catalogTableName;
}
@Override
public void write(SeaTunnelRow element) {
- LONG_ACCUMULATOR.accumulate(1);
TABLE_NAMES.add(element.getTableId());
- if (Objects.nonNull(assertFieldRules)) {
+ List<AssertFieldRule> assertFieldRule = null;
+ String tableName = null;
+ if (assertFieldRules.size() == 1) {
+ assertFieldRule = assertFieldRules.values().iterator().next();
+ }
+ if (assertRowRules.size() == 1) {
+ tableName = assertRowRules.keySet().iterator().next();
+ }
+
+ if (StringUtils.isEmpty(tableName) &&
StringUtils.isNotEmpty(element.getTableId())) {
+ tableName = element.getTableId();
+ } else {
+ tableName = catalogTableName;
+ }
+
+ if (Objects.isNull(assertFieldRule)) {
+ assertFieldRule = assertFieldRules.get(tableName);
+ }
+
+ LONG_ACCUMULATOR
+ .computeIfAbsent(tableName, (k) -> new
LongAccumulator(Long::sum, 0))
+ .accumulate(1);
+ if (Objects.nonNull(assertFieldRule)) {
ASSERT_EXECUTOR
- .fail(element, seaTunnelRowType, assertFieldRules)
+ .fail(element, seaTunnelRowType, assertFieldRule)
.ifPresent(
failRule -> {
throw new AssertConnectorException(
@@ -74,30 +102,57 @@ public class AssertSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
@Override
public void close() {
- if (Objects.nonNull(assertRowRules)) {
- assertRowRules.stream()
- .filter(
- assertRule -> {
- switch (assertRule.getRuleType()) {
- case MAX_ROW:
- return !(LONG_ACCUMULATOR.longValue()
- <= assertRule.getRuleValue());
- case MIN_ROW:
- return !(LONG_ACCUMULATOR.longValue()
- >= assertRule.getRuleValue());
- default:
- return false;
- }
- })
- .findFirst()
- .ifPresent(
- failRule -> {
- throw new AssertConnectorException(
-
AssertConnectorErrorCode.RULE_VALIDATION_FAILED,
- "row num :"
- + LONG_ACCUMULATOR.longValue()
- + " fail rule: "
- + failRule);
+ if (!assertRowRules.isEmpty()) {
+ assertRowRules.entrySet().stream()
+ .filter(entry -> !entry.getValue().isEmpty())
+ .forEach(
+ entry -> {
+ List<AssertFieldRule.AssertRule> assertRules =
entry.getValue();
+ assertRules.stream()
+ .filter(
+ assertRule -> {
+ long count;
+ if
(LONG_ACCUMULATOR.containsKey(
+ entry.getKey())) {
+ count =
+
LONG_ACCUMULATOR
+
.get(entry.getKey())
+
.longValue();
+ } else {
+ count = 0;
+ }
+ switch
(assertRule.getRuleType()) {
+ case MAX_ROW:
+ return !(count
+ <=
assertRule.getRuleValue());
+ case MIN_ROW:
+ return !(count
+ >=
assertRule.getRuleValue());
+ default:
+ return false;
+ }
+ })
+ .findFirst()
+ .ifPresent(
+ failRule -> {
+ long count;
+ if
(LONG_ACCUMULATOR.containsKey(
+ entry.getKey())) {
+ count =
+
LONG_ACCUMULATOR
+
.get(entry.getKey())
+
.longValue();
+ } else {
+ count = 0;
+ }
+ throw new
AssertConnectorException(
+
AssertConnectorErrorCode
+
.RULE_VALIDATION_FAILED,
+ "row num :"
+ + count
+ + " fail
rule: "
+ +
failRule);
+ });
});
}
if (!assertTableRule.getTableNames().isEmpty()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
index 277d048a6d..f366f69b6e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
@@ -46,6 +46,18 @@ public class FakeSourceToAssertIT extends TestSuiteBase {
Assertions.assertEquals(0, execResult.getExitCode());
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK},
+ disabledReason = "Currently FLINK unsupported multi table")
+ public void testFakeSourceToMultiAssertSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/assertion/fakesource_to_multi_table_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_multi_table_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_multi_table_assert.conf
new file mode 100644
index 0000000000..e3077cf387
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_multi_table_assert.conf
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = BATCH
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 16
+ schema {
+ table = "test.table1"
+ fields {
+ c_int = int
+ c_bigint = bigint
+ }
+ }
+ },
+ {
+ row.num = 17
+ schema {
+ table = "test.table2"
+ fields {
+ c_string = string
+ c_tinyint = tinyint
+ }
+ }
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.table1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 16
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 16
+ }
+ ],
+ field_rules = [{
+ field_name = c_int
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = c_bigint
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }]
+ },
+ {
+ table_path = "test.table2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 17
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 17
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }, {
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }]
+ }
+ ]
+
+ }
+ }
+}