This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 87a065af82 [Feature][transform-v2] Data Validator Transform support
(#9445)
87a065af82 is described below
commit 87a065af82825ebae32ee44ea34c370c94bd721f
Author: CosmosNi <[email protected]>
AuthorDate: Thu Jul 31 21:43:10 2025 +0800
[Feature][transform-v2] Data Validator Transform support (#9445)
---
docs/en/transform-v2/data-validator.md | 333 ++++++++++++++++++
docs/zh/transform-v2/data-validator.md | 332 ++++++++++++++++++
plugin-mapping.properties | 2 +-
.../common/exception/CommonErrorCode.java | 3 +-
.../connector/TransformSpecificationCheckTest.java | 2 +-
.../e2e/transform/TestDataValidatorIT.java | 79 +++++
.../test/resources/data_validator_email_udf.conf | 111 ++++++
.../src/test/resources/data_validator_fail.conf | 78 +++++
.../resources/data_validator_route_to_table.conf | 177 ++++++++++
.../src/test/resources/data_validator_skip.conf | 137 ++++++++
.../src/test/resources/data_validator_valid.conf | 137 ++++++++
.../seatunnel/transform/common/ErrorHandleWay.java | 8 +-
.../transform/common/TransformCommonOptions.java | 12 +-
.../transform/exception/TransformCommonError.java | 8 +
.../transform/exception/TransformException.java | 2 +-
.../validator/DataValidatorTransform.java | 315 +++++++++++++++++
.../validator/DataValidatorTransformConfig.java | 380 +++++++++++++++++++++
.../validator/DataValidatorTransformFactory.java | 56 +++
.../transform/validator/FieldValidator.java | 72 ++++
.../transform/validator/ValidationContext.java | 47 +++
.../ValidationResult.java} | 38 ++-
.../validator/ValidationResultHandler.java | 74 ++++
.../validator/rule/LengthValidationRule.java | 112 ++++++
.../validator/rule/NotNullValidationRule.java | 60 ++++
.../validator/rule/RangeValidationRule.java | 97 ++++++
.../validator/rule/RegexValidationRule.java | 98 ++++++
.../validator/rule/UDFValidationRule.java | 136 ++++++++
.../transform/validator/rule/ValidationRule.java | 70 ++++
.../transform/validator/udf/DataValidatorUDF.java | 57 ++++
.../transform/validator/udf/EmailValidator.java | 106 ++++++
30 files changed, 3120 insertions(+), 19 deletions(-)
diff --git a/docs/en/transform-v2/data-validator.md
b/docs/en/transform-v2/data-validator.md
new file mode 100644
index 0000000000..dc9de28ddc
--- /dev/null
+++ b/docs/en/transform-v2/data-validator.md
@@ -0,0 +1,333 @@
+# DataValidator
+
+> Data validation transform plugin
+
+## Description
+
+The DataValidator transform validates field values according to configured
rules and handles validation failures based on the specified error handling
strategy. It supports multiple validation rule types including null checks,
range validation, length validation, and regex pattern matching.
+
+## Options
+
+| name | type | required | default value |
+|-----------------|--------|----------|---------------|
+| row_error_handle_way| enum | no | FAIL |
+| row_error_handle_way.error_table | string | no | |
+| field_rules | array | yes | |
+
+### row_error_handle_way [enum]
+
+Error handling strategy when validation fails:
+- `FAIL`: Fail the entire task when validation errors occur
+- `SKIP`: Skip invalid rows and continue processing
+- `ROUTE_TO_TABLE`: Route invalid data to a specified error table
+
+**Note**: `ROUTE_TO_TABLE` mode only works with sinks that support multiple
tables. The sink must be capable of handling data routed to different table
destinations.
+
+### row_error_handle_way.error_table [string]
+
+Target table name for routing invalid data when `row_error_handle_way` is set
to `ROUTE_TO_TABLE`. This parameter is required when using `ROUTE_TO_TABLE`
mode.
+
+#### Error Table Schema
+
+When using `ROUTE_TO_TABLE` mode, DataValidator automatically creates an error
table with a fixed schema to store validation failure data. The error table
contains the following fields:
+
+| Field Name | Data Type | Description |
+|------------|-----------|-------------|
+| source_table_id | STRING | Source table identifier that identifies the
originating table |
+| source_table_path | STRING | Source table path with complete table path
information |
+| original_data | STRING | JSON representation of the original data containing
the complete row that failed validation |
+| validation_errors | STRING | JSON array of validation error details
containing all failed fields and error information |
+| create_time | TIMESTAMP | Creation time of the validation error |
+
+**Complete Error Table Record Example**:
+```json
+{
+ "source_table_id": "users_table",
+ "source_table_path": "database.users",
+ "original_data": "{\"id\": 123, \"name\": null, \"age\": 200, \"email\":
\"invalid-email\"}",
+ "validation_errors": "[{\"field_name\": \"name\", \"error_message\": \"Field
'name' cannot be null\"}, {\"field_name\": \"age\", \"error_message\": \"Field
'age' value 200 is not within range [0, 150]\"}, {\"field_name\": \"email\",
\"error_message\": \"Field 'email' does not match pattern
'^[\\\\w-\\\\.]+@([\\\\w-]+\\\\.)+[\\\\w-]{2,4}$'\"}]",
+ "create_time": "2024-01-15T10:30:45"
+}
+```
+
+**Data Routing Mechanism**:
+- Data that passes validation maintains the original schema and is routed to
the main output table
+- Data that fails validation is converted to the error table schema format
above and routed to the specified error table
+- Each validation failure row generates one record in the error table,
containing complete original data and detailed error information
+
+### field_rules [array]
+
+Array of field validation rules. Each rule defines validation criteria for a
specific field.
+
+#### Field Rule Structure
+
+Each field rule contains:
+- `field_name`: Name of the field to validate
+- `rules`: Array of validation rules to apply (nested format), or individual
rule properties (flat format)
+
+#### Validation Rule Types
+
+##### NOT_NULL
+Validates that a field value is not null.
+
+Parameters:
+- `rule_type`: "NOT_NULL"
+- `custom_message` (optional): Custom error message
+
+##### RANGE
+Validates that a numeric value is within a specified range.
+
+Parameters:
+- `rule_type`: "RANGE"
+- `min_value` (optional): Minimum allowed value
+- `max_value` (optional): Maximum allowed value
+- `min_inclusive` (optional): Whether minimum value is inclusive (default:
true)
+- `max_inclusive` (optional): Whether maximum value is inclusive (default:
true)
+- `custom_message` (optional): Custom error message
+
+##### LENGTH
+Validates the length of string, array, or collection values.
+
+Parameters:
+- `rule_type`: "LENGTH"
+- `min_length` (optional): Minimum allowed length
+- `max_length` (optional): Maximum allowed length
+- `exact_length` (optional): Exact required length
+- `custom_message` (optional): Custom error message
+
+##### REGEX
+Validates that a string value matches a regular expression pattern.
+
+Parameters:
+- `rule_type`: "REGEX"
+- `pattern`: Regular expression pattern (required)
+- `case_sensitive` (optional): Whether pattern matching is case sensitive
(default: true)
+- `custom_message` (optional): Custom error message
+
+##### UDF (User Defined Function)
+Validates field values using custom business logic implemented as a User
Defined Function.
+
+Parameters:
+- `rule_type`: "UDF"
+- `function_name`: Name of the UDF function to execute (required)
+- `custom_message` (optional): Custom error message
+
+**Built-in UDF Functions:**
+- `EMAIL`: Validates email addresses using practical validation rules based on
OWASP recommendations
+
+**Creating Custom UDF Functions:**
+To create a custom UDF function:
+1. Implement the `DataValidatorUDF` interface
+2. Use `@AutoService(DataValidatorUDF.class)` annotation
+3. Provide a unique `functionName()`
+4. Implement the `validate()` method with your custom logic
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform
Plugin](common-options.md) for details
+
+## Examples
+
+### Example 1: Basic Validation with FAIL Mode
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "age"
+ rule_type = "RANGE"
+ min_value = 0
+ max_value = 150
+ },
+ {
+ field_name = "email"
+ rule_type = "REGEX"
+ pattern = "^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$"
+ }
+ ]
+ }
+}
+```
+
+### Example 2: Validation with SKIP Mode
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "SKIP"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "name"
+ rule_type = "LENGTH"
+ min_length = 2
+ max_length = 50
+ }
+ ]
+ }
+}
+```
+
+### Example 3: Validation with ROUTE_TO_TABLE Mode
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "ROUTE_TO_TABLE"
+ row_error_handle_way.error_table = "error_data"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "age"
+ rule_type = "RANGE"
+ min_value = 0
+ max_value = 150
+ }
+ ]
+ }
+}
+```
+
+**Note**: When using `ROUTE_TO_TABLE`, ensure your sink connector supports
multiple tables. Valid data will be sent to the main output table, while
invalid data will be routed to the specified error table.
+
+In this example:
+- Data that passes validation will maintain the original schema (containing
name, age, etc. fields) and be sent to the main output table
+- Data that fails validation will be converted to the error table schema
(containing source_table_id, source_table_path, original_data,
validation_errors, create_time fields) and routed to the "error_data" table
+
+### Example 4: Nested Rules Format
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "name"
+ rules = [
+ {
+ rule_type = "NOT_NULL"
+ custom_message = "Name is required"
+ },
+ {
+ rule_type = "LENGTH"
+ min_length = 2
+ max_length = 50
+ custom_message = "Name must be between 2 and 50 characters"
+ }
+ ]
+ }
+ ]
+ }
+}
+```
+
+### Example 5: Email Validation using Built-in UDF
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "email"
+ rule_type = "UDF"
+ function_name = "EMAIL"
+ custom_message = "Invalid email address format"
+ }
+ ]
+ }
+}
+```
+
+## UDF Development Guide
+
+### Creating Custom UDF Functions
+
+To create a custom validation UDF function, follow these steps:
+
+#### 1. Implement the DataValidatorUDF Interface
+
+```java
+package com.example.validator;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+import org.apache.seatunnel.transform.validator.udf.DataValidatorUDF;
+import com.google.auto.service.AutoService;
+
+@AutoService(DataValidatorUDF.class)
+public class PhoneValidator implements DataValidatorUDF {
+
+ @Override
+ public String functionName() {
+ return "PHONE_VALIDATOR";
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+
+ if (value == null) {
+ return ValidationResult.success();
+ }
+
+ String phone = value.toString().trim();
+
+ // Custom phone validation logic
+ if (phone.matches("^\\+?[1-9]\\d{1,14}$")) {
+ return ValidationResult.success();
+ } else {
+ return ValidationResult.failure("Invalid phone number format: " +
phone);
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "Validates international phone number format";
+ }
+}
+```
+
+#### 2. Register the UDF
+
+The UDF is automatically registered using the
`@AutoService(DataValidatorUDF.class)` annotation. This uses Java's
ServiceLoader mechanism to discover and load UDF implementations at runtime.
+
+#### 3. Package and Deploy
+
+1. Compile your UDF class and package it into a JAR file
+2. Place the JAR file in the SeaTunnel classpath
+3. The UDF will be automatically discovered and available for use
+
+
+**Usage Example**:
+```hocon
+{
+ field_name = "email"
+ rule_type = "UDF"
+ function_name = "EMAIL"
+ custom_message = "Please provide a valid email address"
+}
+```
\ No newline at end of file
diff --git a/docs/zh/transform-v2/data-validator.md
b/docs/zh/transform-v2/data-validator.md
new file mode 100644
index 0000000000..c744c0d9a8
--- /dev/null
+++ b/docs/zh/transform-v2/data-validator.md
@@ -0,0 +1,332 @@
+# DataValidator
+
+> 数据验证转换插件
+
+## 描述
+
+DataValidator
转换插件根据配置的规则验证字段值,并基于指定的错误处理策略处理验证失败的情况。它支持多种验证规则类型,包括空值检查、范围验证、长度验证和正则表达式模式匹配。
+
+## 选项
+
+| 名称 | 类型 | 是否必需 | 默认值 |
+|-----------------|--------|----------|--------|
+| error_handle_way| enum | 否 | FAIL |
+| row_error_handle_way.error_table | string | 否 | |
+| field_rules | array | 是 | |
+
+### row_error_handle_way [enum]
+
+验证失败时的错误处理策略:
+- `FAIL`: 当验证错误发生时,整个任务失败
+- `SKIP`: 跳过无效行并继续处理
+- `ROUTE_TO_TABLE`: 将无效数据路由到指定的错误表
+
+**注意**: `ROUTE_TO_TABLE` 模式仅适用于支持多表的 sink 连接器。sink 必须具备处理路由到不同表目标的数据的能力。
+
+### row_error_handle_way.error_table [string]
+
+当 `row_error_handle_way` 设置为 `ROUTE_TO_TABLE` 时,用于路由无效数据的目标表名。使用
`ROUTE_TO_TABLE` 模式时此参数为必需。
+
+#### 错误表Schema
+
+当使用 `ROUTE_TO_TABLE`
模式时,DataValidator会自动创建一个具有固定schema的错误表来存储验证失败的数据。错误表包含以下字段:
+
+| 字段名 | 数据类型 | 描述 |
+|--------|----------|------|
+| source_table_id | STRING | 源表标识符,标识数据来源的表 |
+| source_table_path | STRING | 源表路径,完整的表路径信息 |
+| original_data | STRING | 原始数据的JSON表示,包含验证失败的完整行数据 |
+| validation_errors | STRING | 验证错误详情的JSON数组,包含所有验证失败的字段和错误信息 |
+| create_time | TIMESTAMP | 验证错误的创建时间 |
+
+**完整错误表记录示例**:
+```json
+{
+ "source_table_id": "users_table",
+ "source_table_path": "database.users",
+ "original_data": "{\"id\": 123, \"name\": null, \"age\": 200, \"email\":
\"invalid-email\"}",
+ "validation_errors": "[{\"field_name\": \"name\", \"error_message\": \"Field
'name' cannot be null\"}, {\"field_name\": \"age\", \"error_message\": \"Field
'age' value 200 is not within range [0, 150]\"}, {\"field_name\": \"email\",
\"error_message\": \"Field 'email' does not match pattern
'^[\\\\w-\\\\.]+@([\\\\w-]+\\\\.)+[\\\\w-]{2,4}$'\"}]",
+ "create_time": "2024-01-15T10:30:45"
+}
+```
+
+**数据路由机制**:
+- 验证通过的数据会保持原始schema并路由到主输出表
+- 验证失败的数据会被转换为上述错误表schema格式并路由到指定的错误表
+- 每个验证失败的行都会在错误表中生成一条记录,包含完整的原始数据和详细的错误信息
+
+### field_rules [array]
+
+字段验证规则数组。每个规则定义特定字段的验证条件。
+
+#### 字段规则结构
+
+每个字段规则包含:
+- `field_name`: 要验证的字段名称
+- `rules`: 要应用的验证规则数组(嵌套格式),或单独的规则属性(扁平格式)
+
+#### 验证规则类型
+
+##### NOT_NULL
+验证字段值不为空。
+
+参数:
+- `rule_type`: "NOT_NULL"
+- `custom_message` (可选): 自定义错误消息
+
+##### RANGE
+验证数值在指定范围内。
+
+参数:
+- `rule_type`: "RANGE"
+- `min_value` (可选): 最小允许值
+- `max_value` (可选): 最大允许值
+- `min_inclusive` (可选): 最小值是否包含在内(默认: true)
+- `max_inclusive` (可选): 最大值是否包含在内(默认: true)
+- `custom_message` (可选): 自定义错误消息
+
+##### LENGTH
+验证字符串、数组或集合值的长度。
+
+参数:
+- `rule_type`: "LENGTH"
+- `min_length` (可选): 最小允许长度
+- `max_length` (可选): 最大允许长度
+- `exact_length` (可选): 精确要求的长度
+- `custom_message` (可选): 自定义错误消息
+
+##### REGEX
+验证字符串值匹配正则表达式模式。
+
+参数:
+- `rule_type`: "REGEX"
+- `pattern`: 正则表达式模式(必需)
+- `case_sensitive` (可选): 模式匹配是否区分大小写(默认: true)
+- `custom_message` (可选): 自定义错误消息
+
+##### UDF (用户自定义函数)
+使用自定义业务逻辑实现的用户自定义函数验证字段值。
+
+参数:
+- `rule_type`: "UDF"
+- `function_name`: 要执行的UDF函数名称(必需)
+- `custom_message` (可选): 自定义错误消息
+
+**内置UDF函数:**
+- `EMAIL`: 基于OWASP建议使用实用验证规则验证电子邮件地址
+
+**创建自定义UDF函数:**
+要创建自定义UDF函数:
+1. 实现 `DataValidatorUDF` 接口
+2. 使用 `@AutoService(DataValidatorUDF.class)` 注解
+3. 提供唯一的 `functionName()`
+4. 实现包含自定义逻辑的 `validate()` 方法
+
+### 通用选项 [string]
+
+转换插件通用参数,请参考 [Transform Plugin](common-options.md) 了解详情
+
+## 示例
+
+### 示例 1: 使用 FAIL 模式的基本验证
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "age"
+ rule_type = "RANGE"
+ min_value = 0
+ max_value = 150
+ },
+ {
+ field_name = "email"
+ rule_type = "REGEX"
+ pattern = "^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$"
+ }
+ ]
+ }
+}
+```
+
+### 示例 2: 使用 SKIP 模式的验证
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "SKIP"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "name"
+ rule_type = "LENGTH"
+ min_length = 2
+ max_length = 50
+ }
+ ]
+ }
+}
+```
+
+### 示例 3: 使用 ROUTE_TO_TABLE 模式的验证
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "ROUTE_TO_TABLE"
+ row_error_handle_way.error_table = "error_data"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "age"
+ rule_type = "RANGE"
+ min_value = 0
+ max_value = 150
+ }
+ ]
+ }
+}
+```
+
+**注意**: 使用 `ROUTE_TO_TABLE` 时,请确保您的 sink 连接器支持多表。有效数据将发送到主输出表,而无效数据将路由到指定的错误表。
+
+在此示例中:
+- 验证通过的数据将保持原始schema(包含name、age等字段)并发送到主输出表
+-
验证失败的数据将被转换为错误表schema(包含source_table_id、source_table_path、original_data、validation_errors、create_time字段)并路由到"error_data"表
+
+### 示例 4: 嵌套规则格式
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "name"
+ rules = [
+ {
+ rule_type = "NOT_NULL"
+ custom_message = "姓名是必需的"
+ },
+ {
+ rule_type = "LENGTH"
+ min_length = 2
+ max_length = 50
+ custom_message = "姓名长度必须在2到50个字符之间"
+ }
+ ]
+ }
+ ]
+ }
+}
+```
+
+### 示例 5: 使用内置UDF进行邮箱验证
+
+```hocon
+transform {
+ DataValidator {
+ plugin_input = "source_table"
+ plugin_output = "validated_table"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "email"
+ rule_type = "UDF"
+ function_name = "EMAIL"
+ custom_message = "邮箱地址格式无效"
+ }
+ ]
+ }
+}
+```
+
+## UDF开发指南
+
+### 创建自定义UDF函数
+
+要创建自定义验证UDF函数,请按照以下步骤:
+
+#### 1. 实现DataValidatorUDF接口
+
+```java
+package com.example.validator;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+import org.apache.seatunnel.transform.validator.udf.DataValidatorUDF;
+import com.google.auto.service.AutoService;
+
+@AutoService(DataValidatorUDF.class)
+public class PhoneValidator implements DataValidatorUDF {
+
+ @Override
+ public String functionName() {
+ return "PHONE_VALIDATOR";
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+
+ if (value == null) {
+ return ValidationResult.success();
+ }
+
+ String phone = value.toString().trim();
+
+ // 自定义手机号验证逻辑
+ if (phone.matches("^\\+?[1-9]\\d{1,14}$")) {
+ return ValidationResult.success();
+ } else {
+ return ValidationResult.failure("手机号码格式无效: " + phone);
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "验证国际手机号码格式";
+ }
+}
+```
+
+#### 2. 注册UDF
+
+UDF通过 `@AutoService(DataValidatorUDF.class)`
注解自动注册。这使用Java的ServiceLoader机制在运行时发现和加载UDF实现。
+
+#### 3. 打包和部署
+
+1. 编译您的UDF类并将其打包到JAR文件中
+2. 将JAR文件放置在SeaTunnel类路径中
+3. UDF将被自动发现并可供使用
+
+**使用示例**:
+```hocon
+{
+ field_name = "email"
+ rule_type = "UDF"
+ function_name = "EMAIL"
+ custom_message = "请提供有效的邮箱地址"
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 38eec32313..38d74d31f3 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -148,4 +148,4 @@ seatunnel.sink.GraphQL = connector-graphql
seatunnel.sink.Aerospike = connector-aerospike
# For custom transforms, make sure to use the
seatunnel.transform.[PluginIdentifier]=[JarPerfix] naming convention. For
example:
-# seatunnel.transform.Sql = seatunnel-transforms-v2
\ No newline at end of file
+# seatunnel.transform.Sql = seatunnel-transforms-v2
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 9bbe72fe24..0cf3e11c3d 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -26,6 +26,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of
'<field>'"),
UNSUPPORTED_ENCODING("COMMON-08", "unsupported encoding '<encoding>'"),
+ VALIDATION_FAILED("COMMMON-38", "Data validation failed: '<message>'"),
CONVERT_TO_SEATUNNEL_TYPE_ERROR(
"COMMON-16",
"'<connector>' <type> unsupported convert type '<dataType>' of
'<field>' to SeaTunnel data type."),
@@ -79,7 +80,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method
'<methodName>'"),
KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
CLOSE_FAILED("COMMON-36", "'<identifier>' close failed."),
- SEATUNNEL_ROW_SERIALIZE_FAILED("COMMON-36", "Seatunnel row serialize
failed. Row={ '<row>' }"),
+ SEATUNNEL_ROW_SERIALIZE_FAILED("COMMON-37", "Seatunnel row serialize
failed. Row={ '<row>' }"),
;
private final String code;
diff --git
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
index 6f3f64f28a..26e8ac55b7 100644
---
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
+++
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
@@ -43,7 +43,7 @@ public class TransformSpecificationCheckTest {
FactoryUtil.discoverFactories(
Thread.currentThread().getContextClassLoader(),
TableTransformFactory.class);
- Assertions.assertEquals(18, factories.size());
+ Assertions.assertEquals(19, factories.size());
}
@Test
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestDataValidatorIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestDataValidatorIT.java
new file mode 100644
index 0000000000..fb0fa49488
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestDataValidatorIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestDataValidatorIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testDataValidatorWithValidData(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/data_validator_valid.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testDataValidatorWithSkipMode(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/data_validator_skip.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testDataValidatorWithFailMode(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/data_validator_fail.conf");
+ // Should fail due to validation errors
+ Assertions.assertNotEquals(0, execResult.getExitCode());
+
+ // Check for validation error messages in stderr
+ String stderr = execResult.getStderr();
+ Assertions.assertNotNull(stderr, "stderr should not be null");
+ Assertions.assertTrue(
+ stderr.contains("Validation failed") ||
stderr.contains("VALIDATION_FAILED"),
+ "stderr should contain validation error message, but was: " +
stderr);
+
+ // Check for specific validation rule failure (NOT_NULL for name field)
+ Assertions.assertTrue(
+ stderr.contains("name") || stderr.contains("NOT_NULL") ||
stderr.contains("null"),
+ "stderr should contain reference to name field validation
failure, but was: "
+ + stderr);
+ }
+
+ @TestTemplate
+ public void testDataValidatorWithRouteToTable(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/data_validator_route_to_table.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testDataValidatorWithUDF(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/data_validator_email_udf.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_email_udf.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_email_udf.conf
new file mode 100644
index 0000000000..88f0c29d27
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_email_udf.conf
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file demonstrates DataValidator transform with Email UDF
validation
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 4
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ email = "string"
+ }
+ }
+ rows = [
+ {fields = [1, "John Doe", "[email protected]"], kind = INSERT} #
Valid email
+ {fields = [2, "Jane Smith", "[email protected]"], kind = INSERT} #
Valid email
+ {fields = [3, "Bob Johnson", "bob@invalid-email"], kind = INSERT} #
Invalid: no domain
+ {fields = [4, "Alice Brown", "alice@[email protected]"], kind = INSERT}
# Invalid: multiple @
+ ]
+ }
+}
+
+transform {
+ DataValidator {
+ plugin_input = "fake"
+ plugin_output = "validated"
+ row_error_handle_way = "SKIP"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "email"
+ rule_type = "UDF"
+ function_name = "EMAIL"
+ custom_message = "Email validation failed"
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "validated"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = email
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_fail.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_fail.conf
new file mode 100644
index 0000000000..9f121bc27e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_fail.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file demonstrates DataValidator transform with FAIL mode
+###### This test should fail due to validation errors
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 10
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ email = "string"
+ }
+ }
+ rows = [
+ {fields = [1, "John Doe", 25, "[email protected]"], kind = INSERT}
+ {fields = [2, null, 30, "[email protected]"], kind = INSERT} # Invalid:
null name - should cause failure
+ ]
+ }
+}
+
+transform {
+ DataValidator {
+ plugin_input = "fake"
+ plugin_output = "validated"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "name"
+ rules = [
+ {
+ rule_type = "NOT_NULL"
+ }
+ ]
+ },
+ {
+ field_name = "age"
+ rules = [
+ {
+ rule_type = "RANGE"
+ min_value = 0
+ max_value = 150
+ }
+ ]
+ }
+ ]
+ }
+}
+
+sink {
+ Console {
+ plugin_input = "validated"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_route_to_table.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_route_to_table.conf
new file mode 100644
index 0000000000..104020f627
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_route_to_table.conf
@@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file demonstrates DataValidator transform with
ROUTE_TO_TABLE mode
+###### Invalid data will be routed to error table instead of being skipped or
failing
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 10
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ email = "string"
+ }
+ }
+ rows = [
+ {fields = [1, "John Doe", 25, "[email protected]"], kind = INSERT}
+ {fields = [2, "Jane Smith", 30, "[email protected]"], kind = INSERT}
+ {fields = [3, "Charlie Wilson", 32, "[email protected]"], kind =
INSERT}
+ {fields = [4, null, 30, "[email protected]"], kind = INSERT} #
Invalid: null name
+ {fields = [5, "Bob Johnson", 200, "[email protected]"], kind = INSERT} #
Invalid: age > 150
+ {fields = [6, "Alice Brown", 28, "invalid-email"], kind = INSERT} #
Invalid: bad email format
+ ]
+ }
+}
+
+transform {
+ DataValidator {
+ plugin_input = "fake"
+ plugin_output = "validated"
+ row_error_handle_way = "ROUTE_TO_TABLE"
+ row_error_handle_way.error_table = "error_data"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "age"
+ rule_type = "RANGE"
+ min_value = "0"
+ max_value = "150"
+ },
+ {
+ field_name = "email"
+ rule_type = "REGEX"
+ pattern = "^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$"
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "validated"
+ rules = {
+ tables_configs = [
+ {
+ table_path = "fake"
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = email
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ },
+ {
+ table_path = "error_data"
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = source_table_id
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = original_data
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = validation_errors
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_skip.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_skip.conf
new file mode 100644
index 0000000000..e114b86eb2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_skip.conf
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file demonstrates DataValidator transform with SKIP mode
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 10
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ email = "string"
+ }
+ }
+ rows = [
+ {fields = [1, "John Doe", 25, "[email protected]"], kind = INSERT}
+ {fields = [2, null, 30, "[email protected]"], kind = INSERT} # Invalid:
null name
+ {fields = [3, "Bob Johnson", 200, "[email protected]"], kind = INSERT} #
Invalid: age > 150
+ {fields = [4, "Alice Brown", 28, "invalid-email"], kind = INSERT} #
Invalid: bad email
+ {fields = [5, "Charlie Wilson", 32, "[email protected]"], kind =
INSERT}
+ ]
+ }
+}
+
+transform {
+ DataValidator {
+ plugin_input = "fake"
+ plugin_output = "validated"
+ row_error_handle_way = "SKIP"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "name"
+ rule_type = "LENGTH"
+ min_length = "2"
+ max_length = "50"
+ },
+ {
+ field_name = "age"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "age"
+ rule_type = "RANGE"
+ min_value = "0"
+ max_value = "150"
+ },
+ {
+ field_name = "email"
+ rule_type = "REGEX"
+ pattern = "^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$"
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "validated"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 2 # Only 2 valid rows should pass
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = email
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_valid.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_valid.conf
new file mode 100644
index 0000000000..bfeddec7b9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/data_validator_valid.conf
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file demonstrates DataValidator transform with valid data
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 10
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ email = "string"
+ }
+ }
+ rows = [
+ {fields = [1, "John Doe", 25, "[email protected]"], kind = INSERT}
+ {fields = [2, "Jane Smith", 30, "[email protected]"], kind = INSERT}
+ {fields = [3, "Bob Johnson", 35, "[email protected]"], kind = INSERT}
+ {fields = [4, "Alice Brown", 28, "[email protected]"], kind = INSERT}
+ {fields = [5, "Charlie Wilson", 32, "[email protected]"], kind =
INSERT}
+ ]
+ }
+}
+
+transform {
+ DataValidator {
+ plugin_input = "fake"
+ plugin_output = "validated"
+ row_error_handle_way = "FAIL"
+ field_rules = [
+ {
+ field_name = "name"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "name"
+ rule_type = "LENGTH"
+ min_length = "2"
+ max_length = "50"
+ },
+ {
+ field_name = "age"
+ rule_type = "NOT_NULL"
+ },
+ {
+ field_name = "age"
+ rule_type = "RANGE"
+ min_value = "0"
+ max_value = "150"
+ },
+ {
+ field_name = "email"
+ rule_type = "REGEX"
+ pattern = "^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$"
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "validated"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = email
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
index 0f87d6d45f..b3fa0a49a9 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
@@ -23,7 +23,9 @@ public enum ErrorHandleWay {
// Skip the data when error occurs
SKIP,
// Skip the row when error occurs
- SKIP_ROW;
+ SKIP_ROW,
+ // Route invalid data to specified table
+ ROUTE_TO_TABLE;
public boolean allowSkipThisRow() {
return this == SKIP_ROW;
@@ -32,4 +34,8 @@ public enum ErrorHandleWay {
public boolean allowSkip() {
return this == SKIP;
}
+
+ public boolean allowRouteToTable() {
+ return this == ROUTE_TO_TABLE;
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/TransformCommonOptions.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/TransformCommonOptions.java
index f2340086d6..daafc08e2d 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/TransformCommonOptions.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/TransformCommonOptions.java
@@ -51,7 +51,10 @@ public class TransformCommonOptions {
Options.key("row_error_handle_way")
.singleChoice(
ErrorHandleWay.class,
- Arrays.asList(ErrorHandleWay.FAIL,
ErrorHandleWay.SKIP))
+ Arrays.asList(
+ ErrorHandleWay.FAIL,
+ ErrorHandleWay.SKIP,
+ ErrorHandleWay.ROUTE_TO_TABLE))
.defaultValue(ErrorHandleWay.FAIL)
.withDescription(
"The processing method of data format error. The
default value is fail, and the optional value is (fail, skip). "
@@ -66,4 +69,11 @@ public class TransformCommonOptions {
+ "When fail is selected, data format
error will block and an exception will be thrown. "
+ "When skip is selected, data format
error will skip this column data."
+ "When skip_row is selected, data format
error will skip this line data.");
+
+ public static final Option<String> ERROR_TABLE_OPTION =
+ Options.key("row_error_handle_way.error_table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Target table name for routing invalid data when
error_handle_way is ROUTE_TO_TABLE");
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
index 46e6bf833c..66614d47d8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -18,6 +18,9 @@
package org.apache.seatunnel.transform.exception;
import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+
+import org.apache.commons.collections4.map.SingletonMap;
import java.util.HashMap;
import java.util.List;
@@ -81,4 +84,9 @@ public class TransformCommonError {
params.put("wherebody", wherebody);
return new TransformException(WHERE_STATEMENT_ERROR, params, cause);
}
+
+ public static TransformException validationFailed(String message) {
+ Map<String, String> params = new SingletonMap<>("message", message);
+ return new TransformException(CommonErrorCode.VALIDATION_FAILED,
params);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
index 149bc96e9c..57caaf98ab 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
@@ -27,7 +27,7 @@ public class TransformException extends
SeaTunnelRuntimeException {
super(seaTunnelErrorCode, errorMessage);
}
- TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Map<String,
String> params) {
+ public TransformException(SeaTunnelErrorCode seaTunnelErrorCode,
Map<String, String> params) {
super(seaTunnelErrorCode, params);
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransform.java
new file mode 100644
index 0000000000..df011c9827
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransform.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+import
org.apache.seatunnel.transform.validator.ValidationResultHandler.ValidationProcessResult;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** DataValidator Transform for validating field values according to
configured rules. */
+@Slf4j
+public class DataValidatorTransform extends AbstractCatalogSupportMapTransform
{
+ public static final String PLUGIN_NAME = "DataValidator";
+ public static final String SOURCE_TABLE_ID = "source_table_id";
+ public static final String SOURCE_TABLE_PATH = "source_table_path";
+ public static final String ORIGINAL_DATA = "original_data";
+ public static final String VALIDATION_ERRORS = "validation_errors";
+ public static final String CREATE_TIME = "create_time";
+
+ private final DataValidatorTransformConfig config;
+ private final List<FieldValidator> fieldValidators;
+ private final ValidationResultHandler resultHandler;
+ private final ErrorHandleWay errorHandleWay;
+ private final String errorTable;
+
+ public DataValidatorTransform(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
+ super(catalogTable);
+ this.config = DataValidatorTransformConfig.of(readonlyConfig);
+ this.errorHandleWay =
+ readonlyConfig
+
.getOptional(TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION)
+ .orElse(ErrorHandleWay.FAIL);
+ // For ROUTE_TO_TABLE mode, use row_error_handle_way.error_table,
otherwise fallback to
+ // error_table
+ this.errorTable =
+ readonlyConfig
+ .getOptional(TransformCommonOptions.ERROR_TABLE_OPTION)
+ .orElse(
+ readonlyConfig
+
.getOptional(TransformCommonOptions.ERROR_TABLE_OPTION)
+ .orElse(null));
+ this.resultHandler = new ValidationResultHandler();
+ this.fieldValidators = initializeFieldValidators();
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ // Execute validation for all fields
+ Map<String, List<ValidationResult>> fieldResults = new HashMap<>();
+ ValidationContext context =
+ new ValidationContext(
+ inputRow,
+
inputCatalogTable.getTableSchema().toPhysicalRowDataType(),
+ new HashMap<>(),
+ null);
+
+ // Always validate all fields (no fail fast)
+ for (FieldValidator validator : fieldValidators) {
+ String fieldName = validator.getFieldName();
+ Object fieldValue = inputRow.getField(validator.getFieldIndex());
+
+ // Update context with current field name
+ ValidationContext fieldContext =
+ new ValidationContext(
+ inputRow,
+
inputCatalogTable.getTableSchema().toPhysicalRowDataType(),
+ context.getGlobalContext(),
+ fieldName);
+
+ List<ValidationResult> results = validator.validate(fieldValue,
fieldContext, false);
+ fieldResults.put(fieldName, results);
+ }
+
+ // Process validation results
+ ValidationProcessResult processResult =
+ resultHandler.processResults(inputRow, fieldResults);
+
+ // Handle validation failures
+ if (!processResult.isValid()) {
+ log.error(
+ "Validation failed for row: {}",
+ String.join("; ", processResult.getErrorMessages()));
+
+ if (errorHandleWay == ErrorHandleWay.FAIL) {
+ String message =
+ "Validation failed: " + String.join("; ",
processResult.getErrorMessages());
+ throw TransformCommonError.validationFailed(message);
+ } else if (errorHandleWay == ErrorHandleWay.SKIP) {
+ return null; // Skip this row
+ } else if (errorHandleWay.allowRouteToTable()) {
+ // Route invalid data to error table by setting tableId
+ if (errorTable != null && !errorTable.isEmpty()) {
+ String sourceTableId =
inputCatalogTable.getTableId().toString();
+ String sourceTablePath =
inputCatalogTable.getTablePath().toString();
+ SeaTunnelRow errorRow =
+ generateErrorRow(
+ inputRow,
+
inputCatalogTable.getTableSchema().toPhysicalRowDataType(),
+ sourceTableId,
+ sourceTablePath,
+ fieldResults,
+ errorTable);
+ errorRow.setTableId(errorTable);
+ log.debug("Routing invalid data to unified error table:
{}", errorTable);
+ return errorRow;
+ } else {
+ log.warn("Error table not configured, skipping invalid
row");
+ return null;
+ }
+ }
+ }
+ return inputRow;
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ List<CatalogTable> outputTables = new ArrayList<>();
+
+ outputTables.add(getProducedCatalogTable());
+ if (errorHandleWay.allowRouteToTable() && errorTable != null &&
!errorTable.isEmpty()) {
+ TableIdentifier errorTableId =
+ TableIdentifier.of(
+ inputCatalogTable.getTableId().getCatalogName(),
+ inputCatalogTable.getTableId().getDatabaseName(),
+ errorTable);
+ CatalogTable errorCatalogTable =
+ CatalogTable.of(
+ errorTableId,
+ createErrorSchema(),
+ new HashMap<>(),
+ Collections.emptyList(),
+ "Error table for validation failures");
+ outputTables.add(errorCatalogTable);
+ }
+
+ return outputTables;
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return inputCatalogTable.getTableSchema();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId().copy();
+ }
+
+ private List<FieldValidator> initializeFieldValidators() {
+ List<FieldValidator> validators = new ArrayList<>();
+ SeaTunnelRowType rowType =
inputCatalogTable.getTableSchema().toPhysicalRowDataType();
+
+ for (DataValidatorTransformConfig.FieldValidationRule fieldRule :
config.getFieldRules()) {
+ int fieldIndex = rowType.indexOf(fieldRule.getFieldName());
+ if (fieldIndex >= 0) {
+ validators.add(
+ new FieldValidator(
+ fieldRule.getFieldName(),
+ fieldIndex,
+ rowType.getFieldType(fieldIndex),
+ fieldRule.getRules()));
+ } else {
+ log.warn(
+ "Field '{}' not found in schema, skipping validation",
+ fieldRule.getFieldName());
+ }
+ }
+
+ return validators;
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ private SeaTunnelRow generateErrorRow(
+ SeaTunnelRow originalRow,
+ SeaTunnelRowType originalRowType,
+ String sourceTableId,
+ String sourceTablePath,
+ Map<String, List<ValidationResult>> fieldResults,
+ String errorTable) {
+
+ try {
+ String validationErrorsJson =
generateValidationErrorsJson(fieldResults);
+ String originalDataJson = generateOriginalDataJson(originalRow,
originalRowType);
+ SeaTunnelRow errorRow = new SeaTunnelRow(5);
+ errorRow.setField(0, sourceTableId);
+ errorRow.setField(1, sourceTablePath);
+ errorRow.setField(2, originalDataJson);
+ errorRow.setField(3, validationErrorsJson);
+ errorRow.setField(4, LocalDateTime.now());
+ errorRow.setTableId(errorTable);
+
+ return errorRow;
+
+ } catch (Exception e) {
+ log.error("Failed to generate unified error row", e);
+ throw new RuntimeException("Failed to generate unified error row",
e);
+ }
+ }
+
+ private String generateValidationErrorsJson(Map<String,
List<ValidationResult>> fieldResults) {
+ List<Map<String, Object>> errorsList = new ArrayList<>();
+
+ for (Map.Entry<String, List<ValidationResult>> entry :
fieldResults.entrySet()) {
+ String fieldName = entry.getKey();
+ List<ValidationResult> results = entry.getValue();
+
+ for (ValidationResult result : results) {
+ if (!result.isValid()) {
+ Map<String, Object> errorObj = new HashMap<>();
+ errorObj.put("field_name", fieldName);
+ errorObj.put("error_message", result.getErrorMessage());
+ errorsList.add(errorObj);
+ }
+ }
+ }
+
+ return JsonUtils.toJsonString(errorsList);
+ }
+
+ private String generateOriginalDataJson(
+ SeaTunnelRow originalRow, SeaTunnelRowType originalRowType) {
+ Map<String, Object> rowMap = new HashMap<>();
+
+ for (int i = 0; i < originalRow.getFields().length; i++) {
+ String fieldName = originalRowType.getFieldName(i);
+ Object fieldValue = originalRow.getField(i);
+ rowMap.put(fieldName, fieldValue);
+ }
+
+ return JsonUtils.toJsonString(rowMap);
+ }
+
+ private TableSchema createErrorSchema() {
+ List<Column> columns =
+ Arrays.asList(
+ PhysicalColumn.of(
+ SOURCE_TABLE_ID,
+ BasicType.STRING_TYPE,
+ (Long) null,
+ false,
+ null,
+ "Source table identifier"),
+ PhysicalColumn.of(
+ SOURCE_TABLE_PATH,
+ BasicType.STRING_TYPE,
+ (Long) null,
+ false,
+ null,
+ "Source table path"),
+ PhysicalColumn.of(
+ ORIGINAL_DATA,
+ BasicType.STRING_TYPE,
+ (Long) null,
+ false,
+ null,
+ "JSON representation of the problematic row"),
+ PhysicalColumn.of(
+ VALIDATION_ERRORS,
+ BasicType.STRING_TYPE,
+ (Long) null,
+ false,
+ null,
+ "JSON array of validation error details"),
+ PhysicalColumn.of(
+ CREATE_TIME,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ (Long) null,
+ false,
+ null,
+ "Create time of validation error"));
+
+ return TableSchema.builder().columns(columns).build();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransformConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransformConfig.java
new file mode 100644
index 0000000000..2bdbb742b8
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransformConfig.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.transform.validator.rule.LengthValidationRule;
+import org.apache.seatunnel.transform.validator.rule.NotNullValidationRule;
+import org.apache.seatunnel.transform.validator.rule.RangeValidationRule;
+import org.apache.seatunnel.transform.validator.rule.RegexValidationRule;
+import org.apache.seatunnel.transform.validator.rule.UDFValidationRule;
+import org.apache.seatunnel.transform.validator.rule.ValidationRule;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Slf4j
+public class DataValidatorTransformConfig implements Serializable {
+
+ public static final Option<List<Map<String, Object>>> FIELD_RULES =
+ Options.key("field_rules")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription("Field validation rules");
+
+ private List<FieldValidationRule> fieldRules = new ArrayList<>();
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class FieldValidationRule implements Serializable {
+
+ @JsonAlias("field_name")
+ private String fieldName;
+
+ @JsonAlias("rules")
+ private List<ValidationRule> rules = new ArrayList<>();
+ }
+
+ public static DataValidatorTransformConfig of(ReadonlyConfig config) {
+ DataValidatorTransformConfig validatorConfig = new
DataValidatorTransformConfig();
+ List<Map<String, Object>> fieldRulesMap = config.get(FIELD_RULES);
+ List<FieldValidationRule> fieldRules = parseFieldRules(fieldRulesMap);
+ validatorConfig.setFieldRules(fieldRules);
+
+ return validatorConfig;
+ }
+
+ private static List<FieldValidationRule> parseFieldRules(
+ List<Map<String, Object>> fieldRulesMap) {
+ List<FieldValidationRule> fieldRules = new ArrayList<>();
+
+ for (Map<String, Object> ruleMap : fieldRulesMap) {
+ String fieldName = (String) ruleMap.get("field_name");
+ if (fieldName == null) {
+ log.warn("Field name is missing in rule configuration: {}",
ruleMap);
+ continue;
+ }
+
+ FieldValidationRule fieldRule = new FieldValidationRule();
+ fieldRule.setFieldName(fieldName);
+ Object rulesObj = ruleMap.get("rules");
+ if (rulesObj != null) {
+ List<ValidationRule> rules = parseNestedRules(rulesObj);
+ fieldRule.setRules(rules);
+ fieldRules.add(fieldRule);
+ } else {
+ ValidationRule validationRule =
parseValidationRuleFromMap(ruleMap);
+ if (validationRule != null) {
+ fieldRule.setRules(Lists.newArrayList(validationRule));
+ fieldRules.add(fieldRule);
+ }
+ }
+ }
+ return groupFlatRulesByField(fieldRules);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static List<ValidationRule> parseNestedRules(Object rulesObj) {
+ List<ValidationRule> rules = new ArrayList<>();
+
+ try {
+ if (rulesObj instanceof List) {
+ List<Object> rulesList = (List<Object>) rulesObj;
+ for (Object ruleObj : rulesList) {
+ if (ruleObj instanceof Map) {
+ Map<String, Object> ruleMap = (Map<String, Object>)
ruleObj;
+ // Parse rule using the same logic as flat format
+ ValidationRule rule =
parseValidationRuleFromMap(ruleMap);
+ if (rule != null) {
+ rules.add(rule);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to parse nested validation rules: {}", rulesObj,
e);
+ }
+
+ return rules;
+ }
+
+ private static List<FieldValidationRule> groupFlatRulesByField(
+ List<FieldValidationRule> fieldRules) {
+ Map<String, List<ValidationRule>> fieldRulesGroup = new HashMap<>();
+
+ for (FieldValidationRule fieldRule : fieldRules) {
+ String fieldName = fieldRule.getFieldName();
+ List<ValidationRule> existingRules =
fieldRulesGroup.get(fieldName);
+ if (existingRules == null) {
+ fieldRulesGroup.put(fieldName, new
ArrayList<>(fieldRule.getRules()));
+ } else {
+ existingRules.addAll(fieldRule.getRules());
+ }
+ }
+
+ List<FieldValidationRule> groupedRules = new ArrayList<>();
+ for (Map.Entry<String, List<ValidationRule>> entry :
fieldRulesGroup.entrySet()) {
+ FieldValidationRule fieldRule = new FieldValidationRule();
+ fieldRule.setFieldName(entry.getKey());
+ fieldRule.setRules(entry.getValue());
+ groupedRules.add(fieldRule);
+ }
+
+ return groupedRules;
+ }
+
+ private static ValidationRule parseValidationRuleFromMap(Map<String,
Object> ruleData) {
+ Object ruleTypeObj = ruleData.get("rule_type");
+ if (ruleTypeObj == null) {
+ log.warn("Rule type is missing in rule configuration: {}",
ruleData);
+ return null;
+ }
+
+ String ruleType = String.valueOf(ruleTypeObj).toUpperCase();
+
+ try {
+ switch (ruleType) {
+ case "NOT_NULL":
+ return parseNotNullRuleFromMap(ruleData);
+ case "RANGE":
+ return parseRangeRuleFromMap(ruleData);
+ case "LENGTH":
+ return parseLengthRuleFromMap(ruleData);
+ case "REGEX":
+ return parseRegexRuleFromMap(ruleData);
+ case "UDF":
+ return parseUDFRuleFromMap(ruleData);
+ default:
+ log.warn(
+ "Unknown validation rule type: {}. Supported
types: NOT_NULL, RANGE, LENGTH, REGEX, UDF",
+ ruleType);
+ return null;
+ }
+ } catch (Exception e) {
+ log.error("Failed to parse validation rule of type '{}': {}",
ruleType, ruleData, e);
+ return null;
+ }
+ }
+
+ private static NotNullValidationRule parseNotNullRuleFromMap(Map<String,
Object> ruleData) {
+ try {
+ NotNullValidationRule rule = new NotNullValidationRule();
+ Object customMessage = ruleData.get("custom_message");
+ if (customMessage != null) {
+ rule.setCustomMessage(String.valueOf(customMessage));
+ }
+ log.debug("Successfully parsed NOT_NULL rule: {}", rule);
+ return rule;
+ } catch (Exception e) {
+ log.error("Failed to parse NOT_NULL rule from data: {}", ruleData,
e);
+ throw e;
+ }
+ }
+
+ private static RangeValidationRule parseRangeRuleFromMap(Map<String,
Object> ruleData) {
+ try {
+ RangeValidationRule rule = new RangeValidationRule();
+
+ Object minValue = ruleData.get("min_value");
+ if (minValue != null) {
+ rule.setMinValue(parseComparable(String.valueOf(minValue)));
+ }
+
+ Object maxValue = ruleData.get("max_value");
+ if (maxValue != null) {
+ rule.setMaxValue(parseComparable(String.valueOf(maxValue)));
+ }
+
+ Object minInclusive = ruleData.get("min_inclusive");
+ if (minInclusive != null) {
+ rule.setMinInclusive(parseBooleanValue(minInclusive));
+ }
+
+ Object maxInclusive = ruleData.get("max_inclusive");
+ if (maxInclusive != null) {
+ rule.setMaxInclusive(parseBooleanValue(maxInclusive));
+ }
+
+ Object customMessage = ruleData.get("custom_message");
+ if (customMessage != null) {
+ rule.setCustomMessage(String.valueOf(customMessage));
+ }
+
+ log.debug("Successfully parsed RANGE rule: {}", rule);
+ return rule;
+ } catch (Exception e) {
+ log.error("Failed to parse RANGE rule from data: {}", ruleData, e);
+ throw e;
+ }
+ }
+
+ private static LengthValidationRule parseLengthRuleFromMap(Map<String,
Object> ruleData) {
+ try {
+ LengthValidationRule rule = new LengthValidationRule();
+
+ Object minLength = ruleData.get("min_length");
+ if (minLength != null) {
+ rule.setMinLength(parseIntegerValue(minLength));
+ }
+
+ Object maxLength = ruleData.get("max_length");
+ if (maxLength != null) {
+ rule.setMaxLength(parseIntegerValue(maxLength));
+ }
+
+ Object exactLength = ruleData.get("exact_length");
+ if (exactLength != null) {
+ rule.setExactLength(parseIntegerValue(exactLength));
+ }
+
+ Object customMessage = ruleData.get("custom_message");
+ if (customMessage != null) {
+ rule.setCustomMessage(String.valueOf(customMessage));
+ }
+
+ log.debug("Successfully parsed LENGTH rule: {}", rule);
+ return rule;
+ } catch (Exception e) {
+ log.error("Failed to parse LENGTH rule from data: {}", ruleData,
e);
+ throw e;
+ }
+ }
+
+ private static RegexValidationRule parseRegexRuleFromMap(Map<String,
Object> ruleData) {
+ try {
+ RegexValidationRule rule = new RegexValidationRule();
+
+ Object pattern = ruleData.get("pattern");
+ if (pattern != null) {
+ rule.setPattern(String.valueOf(pattern));
+ } else {
+ throw new IllegalArgumentException("Pattern is required for
REGEX rule");
+ }
+
+ Object caseSensitive = ruleData.get("case_sensitive");
+ if (caseSensitive != null) {
+ rule.setCaseSensitive(parseBooleanValue(caseSensitive));
+ }
+
+ Object customMessage = ruleData.get("custom_message");
+ if (customMessage != null) {
+ rule.setCustomMessage(String.valueOf(customMessage));
+ }
+
+ log.debug("Successfully parsed REGEX rule: {}", rule);
+ return rule;
+ } catch (Exception e) {
+ log.error("Failed to parse REGEX rule from data: {}", ruleData, e);
+ throw e;
+ }
+ }
+
+ private static Comparable parseComparable(String value) {
+ if (value == null || value.trim().isEmpty()) {
+ return value;
+ }
+
+ String trimmedValue = value.trim();
+ try {
+ if (trimmedValue.contains(".")) {
+ return Double.parseDouble(trimmedValue);
+ } else {
+ long longValue = Long.parseLong(trimmedValue);
+ if (longValue >= Integer.MIN_VALUE && longValue <=
Integer.MAX_VALUE) {
+ return (int) longValue;
+ }
+ return longValue;
+ }
+ } catch (NumberFormatException e) {
+ log.debug("Value '{}' is not a number, treating as string", value);
+ return value;
+ }
+ }
+
+ private static boolean parseBooleanValue(Object value) {
+ if (value == null) {
+ return false;
+ }
+ if (value instanceof Boolean) {
+ return (Boolean) value;
+ }
+ String stringValue = String.valueOf(value).trim().toLowerCase();
+ return "true".equals(stringValue) || "1".equals(stringValue) ||
"yes".equals(stringValue);
+ }
+
+ private static Integer parseIntegerValue(Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ try {
+ return Integer.parseInt(String.valueOf(value).trim());
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid integer value: " +
value, e);
+ }
+ }
+
+ private static UDFValidationRule parseUDFRuleFromMap(Map<String, Object>
ruleData) {
+ try {
+ UDFValidationRule rule = new UDFValidationRule();
+
+ Object functionName = ruleData.get("function_name");
+ if (functionName != null) {
+ rule.setFunctionName(String.valueOf(functionName));
+ } else {
+ throw new IllegalArgumentException("function_name is required
for UDF rule");
+ }
+
+ Object customMessage = ruleData.get("custom_message");
+ if (customMessage != null) {
+ rule.setCustomMessage(String.valueOf(customMessage));
+ }
+
+ log.debug("Successfully parsed UDF rule: {}", rule);
+ return rule;
+ } catch (Exception e) {
+ log.error("Failed to parse UDF rule from data: {}", ruleData, e);
+ throw e;
+ }
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransformFactory.java
new file mode 100644
index 0000000000..f3d3760d46
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/DataValidatorTransformFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.transform.validator.DataValidatorTransformConfig.FIELD_RULES;
+
+/** Factory for creating DataValidator Transform instances. */
+@AutoService(Factory.class)
+public class DataValidatorTransformFactory implements TableTransformFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return DataValidatorTransform.PLUGIN_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(FIELD_RULES)
+ .optional(TransformCommonOptions.MULTI_TABLES)
+ .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
+ .optional(TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION)
+ .optional(TransformCommonOptions.ERROR_TABLE_OPTION)
+ .build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ return () ->
+ new DataValidatorTransform(context.getOptions(),
context.getCatalogTables().get(0));
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/FieldValidator.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/FieldValidator.java
new file mode 100644
index 0000000000..0e9aa5e4cf
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/FieldValidator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.rule.ValidationRule;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Validator for a specific field, containing multiple validation rules. */
+@Data
+public class FieldValidator implements Serializable {
+ private final String fieldName;
+ private final int fieldIndex;
+ private final SeaTunnelDataType<?> fieldDataType;
+ private final List<ValidationRule> rules;
+
+ public FieldValidator(
+ String fieldName,
+ int fieldIndex,
+ SeaTunnelDataType<?> fieldDataType,
+ List<ValidationRule> rules) {
+ this.fieldName = fieldName;
+ this.fieldIndex = fieldIndex;
+ this.fieldDataType = fieldDataType;
+ this.rules = rules != null ? rules : new ArrayList<>();
+ }
+
+ /**
+ * Validate the field value using all configured rules.
+ *
+ * @param fieldValue the value to validate
+ * @param context validation context
+ * @param failFast whether to stop on first failure
+ * @return list of validation results
+ */
+ public List<ValidationResult> validate(
+ Object fieldValue, ValidationContext context, boolean failFast) {
+ List<ValidationResult> results = new ArrayList<>();
+
+ for (ValidationRule rule : rules) {
+ ValidationResult result = rule.validate(fieldValue, fieldDataType,
context);
+ results.add(result);
+
+ // If fail fast mode and validation failed, stop here
+ if (failFast && !result.isValid()) {
+ break;
+ }
+ }
+
+ return results;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationContext.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationContext.java
new file mode 100644
index 0000000000..34ae96aee3
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationContext.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Context information for validation operations. */
+@Data
+public class ValidationContext implements Serializable {
+ private final SeaTunnelRow currentRow;
+ private final SeaTunnelRowType rowType;
+ private final Map<String, Object> globalContext;
+ private final String currentFieldName;
+
+ public ValidationContext(
+ SeaTunnelRow currentRow,
+ SeaTunnelRowType rowType,
+ Map<String, Object> globalContext,
+ String currentFieldName) {
+ this.currentRow = currentRow;
+ this.rowType = rowType;
+ this.globalContext = globalContext != null ? globalContext : new
HashMap<>();
+ this.currentFieldName = currentFieldName;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationResult.java
similarity index 52%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationResult.java
index 149bc96e9c..75b3b138bf 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationResult.java
@@ -15,24 +15,36 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.exception;
+package org.apache.seatunnel.transform.validator;
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-import java.util.Map;
+import java.io.Serializable;
-public class TransformException extends SeaTunnelRuntimeException {
- public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, String
errorMessage) {
- super(seaTunnelErrorCode, errorMessage);
- }
+/** Result of a validation operation. */
+@Data
+@AllArgsConstructor
+public class ValidationResult implements Serializable {
+ private boolean valid;
+ private String errorMessage;
- TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Map<String,
String> params) {
- super(seaTunnelErrorCode, params);
+ /**
+ * Create a successful validation result.
+ *
+ * @return success result
+ */
+ public static ValidationResult success() {
+ return new ValidationResult(true, null);
}
- TransformException(
- SeaTunnelErrorCode seaTunnelErrorCode, Map<String, String> params,
Throwable cause) {
- super(seaTunnelErrorCode, params, cause);
+ /**
+ * Create a failed validation result.
+ *
+ * @param message error message
+ * @return failure result
+ */
+ public static ValidationResult failure(String message) {
+ return new ValidationResult(false, message);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationResultHandler.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationResultHandler.java
new file mode 100644
index 0000000000..e67610e961
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/ValidationResultHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Handler for processing validation results and generating output. */
+public class ValidationResultHandler implements Serializable {
+
+ public ValidationResultHandler() {}
+
+ /**
+ * Process validation results for all fields and generate final result.
+ *
+ * @param inputRow original input row
+ * @param fieldResults validation results for each field
+ * @return processed validation result
+ */
+ public ValidationProcessResult processResults(
+ SeaTunnelRow inputRow, Map<String, List<ValidationResult>>
fieldResults) {
+
+ ValidationProcessResult result = new ValidationProcessResult();
+ result.setOriginalRow(inputRow);
+ int failedValidations = 0;
+ List<String> errorMessages = new ArrayList<>();
+
+ for (Map.Entry<String, List<ValidationResult>> entry :
fieldResults.entrySet()) {
+ String fieldName = entry.getKey();
+ List<ValidationResult> results = entry.getValue();
+
+ for (ValidationResult validationResult : results) {
+ if (!validationResult.isValid()) {
+ failedValidations++;
+ errorMessages.add(
+ String.format("%s: %s", fieldName,
validationResult.getErrorMessage()));
+ }
+ }
+ }
+ result.setErrorMessages(errorMessages);
+ result.setValid(failedValidations == 0);
+
+ return result;
+ }
+
+ /** Result of validation processing. */
+ @Data
+ public static class ValidationProcessResult implements Serializable {
+ private SeaTunnelRow originalRow;
+ private boolean valid;
+ private List<String> errorMessages = new ArrayList<>();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/LengthValidationRule.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/LengthValidationRule.java
new file mode 100644
index 0000000000..7c9bde06c4
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/LengthValidationRule.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.rule;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Collection;
+
+/** Validation rule to check the length of string, array, or collection
values. */
+@Data
+@NoArgsConstructor
+public class LengthValidationRule implements ValidationRule {
+
+ @JsonAlias("min_length")
+ private Integer minLength;
+
+ @JsonAlias("max_length")
+ private Integer maxLength;
+
+ @JsonAlias("exact_length")
+ private Integer exactLength;
+
+ @JsonAlias("custom_message")
+ private String customMessage;
+
+ public LengthValidationRule(Integer minLength, Integer maxLength) {
+ this.minLength = minLength;
+ this.maxLength = maxLength;
+ }
+
+ public LengthValidationRule(Integer exactLength) {
+ this.exactLength = exactLength;
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+ if (value == null) {
+ return ValidationResult.success();
+ }
+
+ int length = getLength(value);
+
+ if (exactLength != null && length != exactLength) {
+ return ValidationResult.failure(
+ customMessage != null
+ ? customMessage
+ : String.format("Expected length %d but got %d",
exactLength, length));
+ }
+
+ if (minLength != null && length < minLength) {
+ return ValidationResult.failure(
+ customMessage != null
+ ? customMessage
+ : String.format("Length %d is below minimum %d",
length, minLength));
+ }
+
+ if (maxLength != null && length > maxLength) {
+ return ValidationResult.failure(
+ customMessage != null
+ ? customMessage
+ : String.format("Length %d exceeds maximum %d",
length, maxLength));
+ }
+
+ return ValidationResult.success();
+ }
+
+ @Override
+ public String getRuleName() {
+ return "LENGTH";
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return customMessage != null ? customMessage : "Length validation
failed";
+ }
+
+ private int getLength(Object value) {
+ if (value instanceof String) {
+ return ((String) value).length();
+ }
+ if (value instanceof byte[]) {
+ return ((byte[]) value).length;
+ }
+ if (value instanceof Collection) {
+ return ((Collection<?>) value).size();
+ }
+ return value.toString().length();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/NotNullValidationRule.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/NotNullValidationRule.java
new file mode 100644
index 0000000000..80796259e5
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/NotNullValidationRule.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.rule;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Validation rule to check if a field value is not null. */
+@Data
+@NoArgsConstructor
+public class NotNullValidationRule implements ValidationRule {
+
+ @JsonAlias("custom_message")
+ private String customMessage;
+
+ public NotNullValidationRule(String customMessage) {
+ this.customMessage = customMessage;
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+ if (value == null) {
+ return ValidationResult.failure(
+ customMessage != null ? customMessage : "Field cannot be
null");
+ }
+ return ValidationResult.success();
+ }
+
+ @Override
+ public String getRuleName() {
+ return "NOT_NULL";
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return customMessage != null ? customMessage : "Field cannot be null";
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/RangeValidationRule.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/RangeValidationRule.java
new file mode 100644
index 0000000000..03c8158244
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/RangeValidationRule.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.rule;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Validation rule to check if a numeric value is within a specified range. */
+@Data
+@NoArgsConstructor
+public class RangeValidationRule implements ValidationRule {
+
+ @JsonAlias("min_value")
+ private Comparable minValue;
+
+ @JsonAlias("max_value")
+ private Comparable maxValue;
+
+ @JsonAlias("min_inclusive")
+ private boolean minInclusive = true;
+
+ @JsonAlias("max_inclusive")
+ private boolean maxInclusive = true;
+
+ @JsonAlias("custom_message")
+ private String customMessage;
+
+ public RangeValidationRule(Comparable minValue, Comparable maxValue) {
+ this.minValue = minValue;
+ this.maxValue = maxValue;
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+ if (value == null || !(value instanceof Comparable)) {
+ return ValidationResult.success();
+ }
+
+ Comparable comparableValue = (Comparable) value;
+
+ // Check minimum value
+ if (minValue != null) {
+ int minComparison = comparableValue.compareTo(minValue);
+ if (minInclusive ? minComparison < 0 : minComparison <= 0) {
+ return ValidationResult.failure(
+ customMessage != null
+ ? customMessage
+ : String.format("Value %s is below minimum
%s", value, minValue));
+ }
+ }
+
+ // Check maximum value
+ if (maxValue != null) {
+ int maxComparison = comparableValue.compareTo(maxValue);
+ if (maxInclusive ? maxComparison > 0 : maxComparison >= 0) {
+ return ValidationResult.failure(
+ customMessage != null
+ ? customMessage
+ : String.format("Value %s exceeds maximum %s",
value, maxValue));
+ }
+ }
+
+ return ValidationResult.success();
+ }
+
+ @Override
+ public String getRuleName() {
+ return "RANGE";
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return customMessage != null ? customMessage : "Value out of range";
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/RegexValidationRule.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/RegexValidationRule.java
new file mode 100644
index 0000000000..01fdb914ee
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/RegexValidationRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.rule;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.regex.Pattern;
+
+/** Validation rule to check if a string value matches a regular expression
pattern. */
+@Data
+@NoArgsConstructor
+public class RegexValidationRule implements ValidationRule {
+
+ @JsonAlias("pattern")
+ private String pattern;
+
+ @JsonAlias("case_sensitive")
+ private boolean caseSensitive = true;
+
+ @JsonAlias("custom_message")
+ private String customMessage;
+
+ private transient Pattern compiledPattern;
+
+ public RegexValidationRule(String pattern) {
+ this.pattern = pattern;
+ compilePattern();
+ }
+
+ public RegexValidationRule(String pattern, boolean caseSensitive) {
+ this.pattern = pattern;
+ this.caseSensitive = caseSensitive;
+ compilePattern();
+ }
+
+ private void compilePattern() {
+ if (pattern != null) {
+ int flags = caseSensitive ? 0 : Pattern.CASE_INSENSITIVE;
+ this.compiledPattern = Pattern.compile(pattern, flags);
+ }
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+ if (value == null) {
+ return ValidationResult.success();
+ }
+
+ if (compiledPattern == null) {
+ compilePattern();
+ }
+
+ String stringValue = value.toString();
+ if (!compiledPattern.matcher(stringValue).matches()) {
+ return ValidationResult.failure(
+ customMessage != null
+ ? customMessage
+ : String.format(
+ "Value '%s' does not match pattern '%s'",
+ stringValue, pattern));
+ }
+
+ return ValidationResult.success();
+ }
+
+ @Override
+ public String getRuleName() {
+ return "REGEX";
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return customMessage != null ? customMessage : "Pattern validation
failed";
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/UDFValidationRule.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/UDFValidationRule.java
new file mode 100644
index 0000000000..a5f82522a7
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/UDFValidationRule.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.rule;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+import org.apache.seatunnel.transform.validator.udf.DataValidatorUDF;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ServiceLoader;
+
+/**
+ * Validation rule that delegates to a user-defined function (UDF) for
row-level validation. This
+ * rule allows users to implement custom business logic validation that can
access the entire row
+ * data, not just individual field values.
+ */
+@Data
+@NoArgsConstructor
+@Slf4j
+public class UDFValidationRule implements ValidationRule {
+
+ @JsonAlias("function_name")
+ private String functionName;
+
+ @JsonAlias("custom_message")
+ private String customMessage;
+
+ private transient DataValidatorUDF udfInstance;
+
+ public UDFValidationRule(String functionName) {
+ this.functionName = functionName;
+ loadUDF();
+ }
+
+ public UDFValidationRule(String functionName, String customMessage) {
+ this.functionName = functionName;
+ this.customMessage = customMessage;
+ loadUDF();
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+
+ if (udfInstance == null) {
+ loadUDF();
+ }
+
+ if (udfInstance == null) {
+ String errorMsg = String.format("DataValidatorUDF '%s' not found",
functionName);
+ log.error(errorMsg);
+ return ValidationResult.failure(customMessage != null ?
customMessage : errorMsg);
+ }
+
+ try {
+ // For UDF validation, we validate the field value like other
validation rules
+ ValidationResult result = udfInstance.validate(value, dataType,
context);
+
+ // If UDF validation fails and we have a custom message, use it
+ if (!result.isValid() && customMessage != null) {
+ return ValidationResult.failure(customMessage);
+ }
+
+ return result;
+ } catch (Exception e) {
+ String errorMsg =
+ String.format(
+ "Error executing DataValidatorUDF '%s': %s",
+ functionName, e.getMessage());
+ log.error(errorMsg, e);
+ return ValidationResult.failure(customMessage != null ?
customMessage : errorMsg);
+ }
+ }
+
+ @Override
+ public String getRuleName() {
+ return "UDF";
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return customMessage != null
+ ? customMessage
+ : String.format("UDF validation failed: %s", functionName);
+ }
+
+ /**
+ * Load the UDF instance using ServiceLoader mechanism. This method
searches for all available
+ * DataValidatorUDF implementations and finds the one with matching
function name.
+ */
+ private void loadUDF() {
+ if (functionName == null || functionName.trim().isEmpty()) {
+ log.warn("Function name is null or empty, cannot load UDF");
+ return;
+ }
+
+ try {
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ ServiceLoader<DataValidatorUDF> serviceLoader =
+ ServiceLoader.load(DataValidatorUDF.class, classLoader);
+
+ for (DataValidatorUDF udf : serviceLoader) {
+ if (functionName.equalsIgnoreCase(udf.functionName())) {
+ this.udfInstance = udf;
+ log.info("Successfully loaded DataValidatorUDF: {}",
functionName);
+ return;
+ }
+ }
+
+ log.warn("DataValidatorUDF '{}' not found in classpath",
functionName);
+ } catch (Exception e) {
+ log.error("Failed to load DataValidatorUDF '{}': {}",
functionName, e.getMessage(), e);
+ }
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/ValidationRule.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/ValidationRule.java
new file mode 100644
index 0000000000..0062923dce
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/rule/ValidationRule.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.rule;
+
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonSubTypes;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for all validation rules. Each validation rule defines how
to validate a specific
+ * aspect of field data.
+ */
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "rule_type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = NotNullValidationRule.class, name = "NOT_NULL"),
+ @JsonSubTypes.Type(value = RangeValidationRule.class, name = "RANGE"),
+ @JsonSubTypes.Type(value = LengthValidationRule.class, name = "LENGTH"),
+ @JsonSubTypes.Type(value = RegexValidationRule.class, name = "REGEX"),
+ @JsonSubTypes.Type(value = UDFValidationRule.class, name = "UDF")
+})
+public interface ValidationRule extends Serializable {
+
+ /**
+ * Validate the given value according to this rule.
+ *
+ * @param value the value to validate
+ * @param dataType the data type of the field
+ * @param context the validation context
+ * @return validation result
+ */
+ ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context);
+
+ /**
+ * Get the name of this validation rule.
+ *
+ * @return rule name
+ */
+ String getRuleName();
+
+ /**
+ * Get the default error message for this rule.
+ *
+ * @return error message
+ */
+ String getErrorMessage();
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/udf/DataValidatorUDF.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/udf/DataValidatorUDF.java
new file mode 100644
index 0000000000..907cef980e
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/udf/DataValidatorUDF.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.udf;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+
+import java.io.Serializable;
+
+public interface DataValidatorUDF extends Serializable {
+
+ /**
+ * Get the unique name of this validation function. This name will be used
in configuration to
+ * reference this UDF.
+ *
+ * @return function name (should be unique across all DataValidatorUDFs)
+ */
+ String functionName();
+
+ /**
+ * Validate a single field value using custom business logic. This method
receives a single
+ * field value and can perform custom validation logic specific to that
field.
+ *
+ * @param value the field value to validate
+ * @param dataType the data type of the field
+ * @param context validation context containing additional information
+ * @return validation result indicating success or failure with error
message
+ */
+ ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context);
+
+ /**
+ * Get a description of what this validation function does. This is used
for documentation and
+ * error reporting purposes.
+ *
+ * @return description of the validation function
+ */
+ default String getDescription() {
+ return "Custom validation function: " + functionName();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/udf/EmailValidator.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/udf/EmailValidator.java
new file mode 100644
index 0000000000..4bd2ff71e8
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/validator/udf/EmailValidator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.validator.udf;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.validator.ValidationContext;
+import org.apache.seatunnel.transform.validator.ValidationResult;
+
+import com.google.auto.service.AutoService;
+
+import java.util.regex.Pattern;
+
+@AutoService(DataValidatorUDF.class)
+public class EmailValidator implements DataValidatorUDF {
+
+ private static final Pattern DOMAIN_PATTERN =
Pattern.compile("^[a-zA-Z0-9.-]+$");
+
+ private static final int MAX_EMAIL_LENGTH = 254;
+ private static final int MAX_LOCAL_PART_LENGTH = 63;
+
+ @Override
+ public String functionName() {
+ return "EMAIL";
+ }
+
+ @Override
+ public ValidationResult validate(
+ Object value, SeaTunnelDataType<?> dataType, ValidationContext
context) {
+ // Skip validation if value is null
+ if (value == null) {
+ return ValidationResult.success();
+ }
+
+ String email = value.toString().trim();
+
+ // Skip validation if empty
+ if (email.isEmpty()) {
+ return ValidationResult.success();
+ }
+
+ // Basic length check
+ if (email.length() > MAX_EMAIL_LENGTH) {
+ return ValidationResult.failure(
+ "Email too long (max " + MAX_EMAIL_LENGTH + " characters):
" + email);
+ }
+
+ // Must contain exactly one @ symbol
+ int atIndex = email.indexOf('@');
+ if (atIndex <= 0 || atIndex != email.lastIndexOf('@')) {
+ return ValidationResult.failure("Email must contain exactly one @
symbol: " + email);
+ }
+
+ // Split into local and domain parts
+ String localPart = email.substring(0, atIndex);
+ String domainPart = email.substring(atIndex + 1);
+
+ // Validate local part
+ if (localPart.length() > MAX_LOCAL_PART_LENGTH) {
+ return ValidationResult.failure(
+ "Email local part too long (max "
+ + MAX_LOCAL_PART_LENGTH
+ + " characters): "
+ + email);
+ }
+
+ // Check for dangerous characters (basic security check)
+ if (email.contains("\"")
+ || email.contains("'")
+ || email.contains("`")
+ || email.contains("\0")) {
+ return ValidationResult.failure("Email contains dangerous
characters: " + email);
+ }
+
+ // Validate domain part format
+ if (!DOMAIN_PATTERN.matcher(domainPart).matches()) {
+ return ValidationResult.failure("Email domain contains invalid
characters: " + email);
+ }
+
+ // Domain must contain at least one dot
+ if (!domainPart.contains(".")) {
+ return ValidationResult.failure("Email domain must contain at
least one dot: " + email);
+ }
+
+ return ValidationResult.success();
+ }
+
+ @Override
+ public String getDescription() {
+ return "Practical email validation based on OWASP recommendations";
+ }
+}