This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 46554b0b6a [Feature][Transform-V2] Add FieldEncrypt transform for 
encrypting selected fields (#10361)
46554b0b6a is described below

commit 46554b0b6ae8eb68ccf0a5837f697d6ef3f4fe43
Author: dy102 <[email protected]>
AuthorDate: Fri Feb 27 18:15:38 2026 +0900

    [Feature][Transform-V2] Add FieldEncrypt transform for encrypting selected 
fields (#10361)
---
 docs/en/transforms/encrypt.md                      |  59 ++++
 docs/zh/transforms/encrypt.md                      |  66 +++++
 .../common/exception/CommonErrorCode.java          |   2 +-
 .../connector/TransformSpecificationCheckTest.java |   8 +-
 .../e2e/transform/TestFieldEncryptIT.java          |  57 ++++
 .../test/resources/field_decrypt_transform.conf    |  63 ++++
 .../field_decrypt_transform_multi_table.conf       | 189 ++++++++++++
 .../test/resources/field_encrypt_transform.conf    |  68 +++++
 .../field_encrypt_transform_multi_table.conf       | 209 ++++++++++++++
 .../FieldEncryptMultiCatalogTransform.java}        |  10 +-
 .../transform/encrypt/FieldEncryptTransform.java   | 158 ++++++++++
 .../encrypt/FieldEncryptTransformConfig.java       |  53 ++++
 .../FieldEncryptTransformFactory.java}             |  15 +-
 .../encrypt/encryptor/AesCbcEncryptor.java         | 131 +++++++++
 .../transform/encrypt/encryptor/Encryptor.java     |  28 ++
 .../transform/exception/TransformCommonError.java  |   7 +
 .../exception/TransformCommonErrorCode.java        |   4 +-
 ....java => FilterFieldMultiCatalogTransform.java} |   4 +-
 .../filter/FilterFieldTransformFactory.java        |   2 +-
 .../encrypt/FieldEncryptTransformTest.java         | 321 +++++++++++++++++++++
 20 files changed, 1433 insertions(+), 21 deletions(-)

diff --git a/docs/en/transforms/encrypt.md b/docs/en/transforms/encrypt.md
new file mode 100644
index 0000000000..cfcd479d85
--- /dev/null
+++ b/docs/en/transforms/encrypt.md
@@ -0,0 +1,59 @@
+# Encrypt
+
+> Encrypt transform plugin
+
+## Description
+
+The Encrypt transform plugin is used to encrypt or decrypt specified fields in 
records using a symmetric encryption algorithm.
+
+## Options
+
+| name        | type   | required | default value | description                
       |
+|-------------|--------|----------|---------------|-----------------------------------|
+| `fields`    | Array  | Yes      | -             | List of fields to 
encrypt/decrypt |
+| `algorithm` | String | No       | `AES_CBC`     | Encryption algorithm       
       |
+| `key`       | String | Yes      | -             | Base64-encoded encryption 
key     |
+| `mode`      | String | No       | `ENCRYPT`     | `ENCRYPT`or `DECRYPT`      
       |
+
+### algorithm [string]
+
+Encryption algorithm used by this transform.
+Currently, only `AES_CBC` is supported.
+
+### key [string]
+
+The encryption key must be provided in Base64-encoded format.
+Make sure the key length matches the requirements of the selected algorithm.
+For `AES_CBC`, valid key lengths are 16, 24, or 32 bytes (corresponding to 
AES-128, AES-192, or AES-256).
+
+**Example**
+- `base64:AAAAAAAAAAAAAAAAAAAAAA==`
+- `AAAAAAAAAAAAAAAAAAAAAA==`
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform 
Plugin](common-options.md) for details
+
+## Example
+
+```
+transform {
+  FieldEncrypt {
+       fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "ENCRYPT"
+  }
+}
+```
+
+```
+transform {
+  FieldEncrypt {
+       fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "DECRYPT"
+  }
+}
+```
diff --git a/docs/zh/transforms/encrypt.md b/docs/zh/transforms/encrypt.md
new file mode 100644
index 0000000000..5600f6fb13
--- /dev/null
+++ b/docs/zh/transforms/encrypt.md
@@ -0,0 +1,66 @@
+# Encrypt
+
+> 加密 Transform 插件
+
+## 描述
+
+Encrypt Transform 插件用于使用对称加密算法,对记录中指定的字段进行加密或解密。
+
+## 参数说明
+
+| 参数名         | 类型     | 是否必填 | 默认值       | 描述                         |
+|-------------|--------|------|-----------|----------------------------|
+| `fields`    | Array  | 是    | -         | 需要加密或解密的字段列表               |
+| `algorithm` | String | 否    | `AES_CBC` | 加密算法                       |
+| `key`       | String | 是    | -         | Base64 编码的加密密钥             |
+| `mode`      | String | 否    | `ENCRYPT` | 操作模式:`ENCRYPT` 或 `DECRYPT` |
+
+### algorithm [string]
+
+本 Transform 使用的加密算法。
+目前仅支持 `AES_CBC`。
+
+### key [string]
+
+加密密钥必须以 Base64 编码格式提供。
+请确保密钥长度符合所选加密算法的要求。
+
+对于 `AES_CBC`,支持的密钥长度为 16、24 或 32 字节
+(分别对应 AES-128、AES-192 和 AES-256)。
+
+**示例**
+
+- `base64:AAAAAAAAAAAAAAAAAAAAAA==`
+- `AAAAAAAAAAAAAAAAAAAAAA==`
+
+### common options [string]
+
+Transform 插件的通用参数,请参考 [Transform Plugin](common-options.md)。
+
+## 示例
+
+### 字段加密
+
+```hocon
+transform {
+  FieldEncrypt {
+    fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "encrypt"
+  }
+}
+```
+
+### 字段解密
+
+```hocon
+transform {
+  FieldEncrypt {
+    fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "decrypt"
+  }
+}
+```
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 0cf3e11c3d..d0fb019dc5 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -26,7 +26,6 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
     UNSUPPORTED_DATA_TYPE(
             "COMMON-07", "'<identifier>' unsupported data type '<dataType>' of 
'<field>'"),
     UNSUPPORTED_ENCODING("COMMON-08", "unsupported encoding '<encoding>'"),
-    VALIDATION_FAILED("COMMMON-38", "Data validation failed: '<message>'"),
     CONVERT_TO_SEATUNNEL_TYPE_ERROR(
             "COMMON-16",
             "'<connector>' <type> unsupported convert type '<dataType>' of 
'<field>' to SeaTunnel data type."),
@@ -81,6 +80,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
     KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
     CLOSE_FAILED("COMMON-36", "'<identifier>' close failed."),
     SEATUNNEL_ROW_SERIALIZE_FAILED("COMMON-37", "Seatunnel row serialize 
failed. Row={ '<row>' }"),
+    VALIDATION_FAILED("COMMON-38", "Data validation failed: '<message>'"),
     ;
 
     private final String code;
diff --git 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
index d58e8f083d..ce51350644 100644
--- 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
+++ 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
@@ -31,10 +31,10 @@ import java.util.List;
 import java.util.ServiceLoader;
 
 @Slf4j
-public class TransformSpecificationCheckTest {
+class TransformSpecificationCheckTest {
 
     @Test
-    public void testAllTransformUseFactory() {
+    void testAllTransformUseFactory() {
         ServiceLoader<SeaTunnelTransform> transforms =
                 ServiceLoader.load(
                         SeaTunnelTransform.class, 
Thread.currentThread().getContextClassLoader());
@@ -43,11 +43,11 @@ public class TransformSpecificationCheckTest {
                 FactoryUtil.discoverFactories(
                         Thread.currentThread().getContextClassLoader(),
                         TableTransformFactory.class);
-        Assertions.assertEquals(20, factories.size());
+        Assertions.assertEquals(21, factories.size());
     }
 
     @Test
-    public void testAllTransformSupportMultiTable() {
+    void testAllTransformSupportMultiTable() {
         List<TableTransformFactory> factories =
                 FactoryUtil.discoverFactories(
                         Thread.currentThread().getContextClassLoader(),
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldEncryptIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldEncryptIT.java
new file mode 100644
index 0000000000..5265139941
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldEncryptIT.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestFieldEncryptIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testEncryption(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/field_encrypt_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testDecryption(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/field_decrypt_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testEncryptionMultiTable(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/field_encrypt_transform_multi_table.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testDecryptionMultiTable(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/field_decrypt_transform_multi_table.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform.conf
new file mode 100644
index 0000000000..52eba3d721
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    string.fake.mode = "template"
+    string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+    schema {
+        fields {
+          id = bigint
+          name = string
+          age = smallint
+        }
+      }
+  }
+}
+
+transform {
+  FieldEncrypt {
+       fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "decrypt"
+  }
+}
+
+sink {
+  Assert {
+    rules =
+      {
+        field_rules = [{
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              equals_to = "value1"
+            }
+          ]
+        }]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform_multi_table.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform_multi_table.conf
new file mode 100644
index 0000000000..5fdb1c28c8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform_multi_table.conf
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        string.fake.mode = "template"
+        string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+        row.num = 100
+        schema = {
+          table = "test.abc"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "address"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        string.fake.mode = "template"
+        string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+        row.num = 100
+        schema = {
+          table = "test.xyz"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      },
+      {
+        string.fake.mode = "template"
+        string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+        row.num = 100
+        schema = {
+          table = "test.www"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "address"
+              type = "string"
+            }
+          ]
+        }
+      }
+    ]
+  }
+}
+
+transform {
+  FieldEncrypt {
+       fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "DECRYPT"
+
+    table_transform = [
+      {
+        table_path = "test.abc"
+        fields = ["name", "address"]
+        key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+        mode = "DECRYPT"
+      }
+    ]
+  }
+}
+
+sink {
+  Assert {
+    rules =
+      {
+        tables_configs = [
+            {
+                table_path = "test.abc"
+                field_rules = [
+                {
+                  field_name = name
+                  field_type = string
+                  field_value = [
+                    {
+                       equals_to = "value1"
+                    }
+                  ]
+                },
+                {
+                    field_name = address
+                    field_type = string
+                    field_value = [
+                      {
+                         equals_to = "value1"
+                      }
+                    ]
+                }
+               ]
+           },
+          {
+            table_path = "test.xyz"
+            field_rules = [
+            {
+              field_name = name
+              field_type = string
+              field_value = [
+                {
+                   equals_to = "value1"
+                }
+              ]
+            }
+            ]
+          },
+          {
+            table_path = "test.www"
+            field_rules = [
+            {
+              field_name = name
+              field_type = string
+              field_value = [
+                {
+                   equals_to = "value1"
+                }
+              ]
+            },
+            {
+                field_name = address
+                field_type = string
+                field_value = [
+                  {
+                     rule_type = MIN_LENGTH
+                     rule_value = 44
+                  },
+                  {
+                     rule_type = MAX_LENGTH
+                     rule_value = 44
+                  }
+                ]
+            }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform.conf
new file mode 100644
index 0000000000..d0abbd98dc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    string.fake.mode = "template"
+    string.template = ["value1"]
+    schema {
+        fields {
+          id = bigint
+          name = string
+          age = smallint
+        }
+      }
+  }
+}
+
+transform {
+  FieldEncrypt {
+       fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "encrypt"
+  }
+}
+
+sink {
+  Assert {
+    rules =
+      {
+        field_rules = [{
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+               rule_type = MIN_LENGTH
+               rule_value = 44
+            },
+            {
+               rule_type = MAX_LENGTH
+               rule_value = 44
+            }
+          ]
+        }]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform_multi_table.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform_multi_table.conf
new file mode 100644
index 0000000000..3bb1b9a7c8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform_multi_table.conf
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        string.fake.mode = "template"
+        string.template = ["value1"]
+        row.num = 100
+        schema = {
+          table = "test.abc"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "address"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        string.fake.mode = "template"
+        string.template = ["value1"]
+        row.num = 100
+        schema = {
+          table = "test.xyz"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      },
+      {
+        string.fake.mode = "template"
+        string.template = ["value1"]
+        row.num = 100
+        schema = {
+          table = "test.www"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "address"
+              type = "string"
+            }
+          ]
+        }
+      }
+    ]
+  }
+}
+
+transform {
+  FieldEncrypt {
+       fields = ["name"]
+    key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+    algorithm = "AES_CBC"
+    mode = "ENCRYPT"
+
+    table_transform = [
+      {
+        table_path = "test.abc"
+        fields = ["name", "address"]
+        key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+        mode = "ENCRYPT"
+      }
+    ]
+  }
+}
+
+sink {
+  Assert {
+    rules =
+      {
+        tables_configs = [
+            {
+                table_path = "test.abc"
+                field_rules = [
+                {
+                  field_name = name
+                  field_type = string
+                  field_value = [
+                    {
+                       rule_type = MIN_LENGTH
+                       rule_value = 44
+                    },
+                    {
+                       rule_type = MAX_LENGTH
+                       rule_value = 44
+                    }
+                  ]
+                },
+                {
+                    field_name = address
+                    field_type = string
+                    field_value = [
+                      {
+                         rule_type = MIN_LENGTH
+                         rule_value = 44
+                      },
+                      {
+                         rule_type = MAX_LENGTH
+                         rule_value = 44
+                      }
+                    ]
+                }
+               ]
+           },
+          {
+            table_path = "test.xyz"
+            field_rules = [
+            {
+              field_name = name
+              field_type = string
+              field_value = [
+                {
+                   rule_type = MIN_LENGTH
+                   rule_value = 44
+                },
+                {
+                   rule_type = MAX_LENGTH
+                   rule_value = 44
+                }
+              ]
+            }
+            ]
+          },
+          {
+            table_path = "test.www"
+            field_rules = [
+            {
+              field_name = name
+              field_type = string
+              field_value = [
+                {
+                   rule_type = MIN_LENGTH
+                   rule_value = 44
+                },
+                {
+                   rule_type = MAX_LENGTH
+                   rule_value = 44
+                }
+              ]
+            },
+            {
+                field_name = address
+                field_type = string
+                field_value = [
+                  {
+                     rule_type = MIN_LENGTH
+                     rule_value = 6
+                  },
+                  {
+                     rule_type = MAX_LENGTH
+                     rule_value = 6
+                  }
+                ]
+            }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptMultiCatalogTransform.java
similarity index 85%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptMultiCatalogTransform.java
index f281b3d855..daf32e4908 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptMultiCatalogTransform.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.filter;
+package org.apache.seatunnel.transform.encrypt;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -26,22 +26,22 @@ import 
org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
-public class FieldFieldMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
+public class FieldEncryptMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
 
-    public FieldFieldMultiCatalogTransform(
+    public FieldEncryptMultiCatalogTransform(
             List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
         super(inputCatalogTables, config);
     }
 
     @Override
     public String getPluginName() {
-        return FilterFieldTransform.PLUGIN_NAME;
+        return FieldEncryptTransform.PLUGIN_NAME;
     }
 
     @Override
     protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
-        return new FilterFieldTransform(config, inputCatalogTable);
+        return new FieldEncryptTransform(config, inputCatalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransform.java
new file mode 100644
index 0000000000..ee599d5b49
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransform.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonError;
+import 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+import org.apache.seatunnel.transform.encrypt.encryptor.Encryptor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+
+import lombok.NonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.function.UnaryOperator;
+import java.util.stream.StreamSupport;
+
+public class FieldEncryptTransform extends AbstractCatalogSupportMapTransform {
+    public static final String PLUGIN_NAME = "FieldEncrypt";
+
+    private static final String ENCRYPT = "ENCRYPT";
+    private static final String DECRYPT = "DECRYPT";
+
+    private final List<String> fields = new ArrayList<>();
+    private final String key;
+    private final String encryptAlgorithm;
+    private final String mode;
+    private final int maxFieldLength;
+
+    private transient volatile Encryptor encryptor;
+    private int[] encryptFieldIndexes;
+
+    public FieldEncryptTransform(
+            @NonNull ReadonlyConfig config, @NonNull CatalogTable 
catalogTable) {
+        super(catalogTable);
+
+        this.fields.addAll(config.get(FieldEncryptTransformConfig.FIELDS));
+        this.key = config.get(FieldEncryptTransformConfig.KEY);
+        this.encryptAlgorithm = 
config.get(FieldEncryptTransformConfig.ALGORITHM);
+        this.mode = config.get(FieldEncryptTransformConfig.MODE);
+        this.maxFieldLength = 
config.get(FieldEncryptTransformConfig.MAX_FIELD_LENGTH);
+
+        initializeFieldIndexes();
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        if (encryptor == null) {
+            ServiceLoader<Encryptor> loader = 
ServiceLoader.load(Encryptor.class);
+            Optional<Encryptor> optionalEncryptor =
+                    StreamSupport.stream(loader.spliterator(), false)
+                            .filter(e -> e.support(encryptAlgorithm))
+                            .findFirst();
+
+            if (!optionalEncryptor.isPresent()) {
+                throw CommonError.unsupportedOperation(
+                        PLUGIN_NAME, "Unsupported encrypt algorithm");
+            }
+            this.encryptor = optionalEncryptor.get();
+            this.encryptor.init(this.key);
+        }
+
+        if (ENCRYPT.equalsIgnoreCase(mode)) {
+            return processFields(inputRow, encryptor::encrypt);
+        } else if (DECRYPT.equalsIgnoreCase(mode)) {
+            return processFields(inputRow, encryptor::decrypt);
+        } else {
+            throw CommonError.illegalArgument(mode, "mode only support encrypt 
or decrypt");
+        }
+    }
+
+    private SeaTunnelRow processFields(SeaTunnelRow inputRow, 
UnaryOperator<String> action) {
+        SeaTunnelRow outputRow = inputRow.copy();
+        for (int index : encryptFieldIndexes) {
+            Object field = outputRow.getField(index);
+            if (field == null) {
+                continue;
+            }
+
+            String value = field.toString();
+            if (value.length() > maxFieldLength) {
+                throw CommonError.illegalArgument(
+                        String.valueOf(value.length()),
+                        "Field length exceeds the maximum limit of " + 
maxFieldLength);
+            }
+
+            if (StringUtils.isNotBlank(value)) {
+                outputRow.setField(index, action.apply(value));
+            }
+        }
+        return outputRow;
+    }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        return inputCatalogTable.getTableSchema();
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputCatalogTable.getTableId();
+    }
+
+    private void initializeFieldIndexes() {
+        List<Column> columns = inputCatalogTable.getTableSchema().getColumns();
+        encryptFieldIndexes =
+                fields.stream()
+                        .mapToInt(
+                                fieldName -> {
+                                    for (int i = 0; i < columns.size(); i++) {
+                                        if 
(columns.get(i).getName().equals(fieldName)) {
+                                            if (BasicType.STRING_TYPE.equals(
+                                                    
columns.get(i).getDataType())) {
+                                                return i;
+                                            } else {
+                                                throw 
CommonError.unsupportedDataType(
+                                                        PLUGIN_NAME,
+                                                        
columns.get(i).getDataType().toString(),
+                                                        
columns.get(i).getName());
+                                            }
+                                        }
+                                    }
+                                    throw 
TransformCommonError.cannotFindInputFieldError(
+                                            PLUGIN_NAME, fieldName);
+                                })
+                        .toArray();
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformConfig.java
new file mode 100644
index 0000000000..584f6490ed
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.transform.encrypt.encryptor.AesCbcEncryptor;
+
+import java.util.List;
+
+public class FieldEncryptTransformConfig {
+    public static final Option<List<String>> FIELDS =
+            Options.key("fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("The list of fields that need to be 
encrypted.");
+
+    public static final Option<String> ALGORITHM =
+            Options.key("algorithm")
+                    .stringType()
+                    .defaultValue(AesCbcEncryptor.IDENTIFIER)
+                    .withDescription("The encryption algorithm, support 
AES_CBC.");
+
+    public static final Option<String> KEY =
+            
Options.key("key").stringType().noDefaultValue().withDescription("The 
encryption key.");
+
+    public static final Option<String> MODE =
+            Options.key("mode")
+                    .stringType()
+                    .defaultValue("encrypt")
+                    .withDescription("The mode of the transform, support 
encrypt and decrypt.");
+
+    public static final Option<Integer> MAX_FIELD_LENGTH =
+            Options.key("max_field_length")
+                    .intType()
+                    .defaultValue(10 * 1024 * 1024) // 10MB
+                    .withDescription("Maximum field length to encrypt");
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformFactory.java
similarity index 77%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformFactory.java
index 3315224905..d12d4292a8 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.filter;
+package org.apache.seatunnel.transform.encrypt;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.connector.TableTransform;
@@ -26,10 +26,10 @@ import 
org.apache.seatunnel.transform.common.TransformCommonOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.transform.filter.FilterFieldTransform.PLUGIN_NAME;
+import static 
org.apache.seatunnel.transform.encrypt.FieldEncryptTransform.PLUGIN_NAME;
 
 @AutoService(Factory.class)
-public class FilterFieldTransformFactory implements TableTransformFactory {
+public class FieldEncryptTransformFactory implements TableTransformFactory {
     @Override
     public String factoryIdentifier() {
         return PLUGIN_NAME;
@@ -38,9 +38,10 @@ public class FilterFieldTransformFactory implements 
TableTransformFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .optional(
-                        FilterFieldTransformConfig.INCLUDE_FIELDS,
-                        FilterFieldTransformConfig.EXCLUDE_FIELDS)
+                .required(FieldEncryptTransformConfig.FIELDS)
+                .required(FieldEncryptTransformConfig.KEY)
+                .optional(FieldEncryptTransformConfig.ALGORITHM)
+                .optional(FieldEncryptTransformConfig.MODE)
                 .optional(TransformCommonOptions.MULTI_TABLES)
                 .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
                 .build();
@@ -49,7 +50,7 @@ public class FilterFieldTransformFactory implements 
TableTransformFactory {
     @Override
     public TableTransform createTransform(TableTransformFactoryContext 
context) {
         return () ->
-                new FieldFieldMultiCatalogTransform(
+                new FieldEncryptMultiCatalogTransform(
                         context.getCatalogTables(), context.getOptions());
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/AesCbcEncryptor.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/AesCbcEncryptor.java
new file mode 100644
index 0000000000..33b137d5ed
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/AesCbcEncryptor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt.encryptor;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+
+import com.google.auto.service.AutoService;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Base64;
+
+@AutoService(Encryptor.class)
+public class AesCbcEncryptor implements Encryptor {
+    public static final String IDENTIFIER = "AES_CBC";
+
+    private static final int IV_SIZE = 16;
+    private static final SecureRandom SECURE_RANDOM = new SecureRandom();
+    private static final String ALGORITHM = "AES/CBC/PKCS5Padding";
+
+    private SecretKeySpec keySpec;
+
+    @Override
+    public boolean support(String algorithm) {
+        return IDENTIFIER.equals(algorithm);
+    }
+
+    @Override
+    public void init(String key) {
+        this.keySpec = buildAesKey(key);
+    }
+
+    @Override
+    public String encrypt(String plainText) {
+        byte[] iv = new byte[IV_SIZE];
+        SECURE_RANDOM.nextBytes(iv);
+
+        IvParameterSpec ivSpec = new IvParameterSpec(iv);
+
+        byte[] encrypted;
+        try {
+            Cipher cipher = Cipher.getInstance(ALGORITHM);
+            cipher.init(Cipher.ENCRYPT_MODE, keySpec, ivSpec);
+            encrypted = 
cipher.doFinal(plainText.getBytes(StandardCharsets.UTF_8));
+        } catch (Exception e) {
+            throw TransformCommonError.encryptionError("plaintext length:" + 
plainText.length(), e);
+        }
+
+        byte[] encryptedWithIv = new byte[IV_SIZE + encrypted.length];
+        System.arraycopy(iv, 0, encryptedWithIv, 0, IV_SIZE);
+        System.arraycopy(encrypted, 0, encryptedWithIv, IV_SIZE, 
encrypted.length);
+
+        return Base64.getEncoder().encodeToString(encryptedWithIv);
+    }
+
+    @Override
+    public String decrypt(String cipherText) {
+        byte[] decoded = Base64.getDecoder().decode(cipherText);
+        byte[] iv = new byte[IV_SIZE];
+        if (decoded.length < IV_SIZE) {
+            throw CommonError.illegalArgument(cipherText, "Invalid encrypted 
value");
+        }
+        byte[] encrypted = new byte[decoded.length - IV_SIZE];
+
+        System.arraycopy(decoded, 0, iv, 0, IV_SIZE);
+        System.arraycopy(decoded, IV_SIZE, encrypted, 0, encrypted.length);
+
+        IvParameterSpec ivSpec = new IvParameterSpec(iv);
+
+        byte[] original;
+        try {
+            Cipher cipher = Cipher.getInstance(ALGORITHM);
+            cipher.init(Cipher.DECRYPT_MODE, keySpec, ivSpec);
+            original = cipher.doFinal(encrypted);
+        } catch (Exception e) {
+            throw TransformCommonError.encryptionError(
+                    "ciphertext length:" + cipherText.length(), e);
+        }
+
+        return new String(original, StandardCharsets.UTF_8);
+    }
+
+    private SecretKeySpec buildAesKey(String key) {
+        if (key == null || key.trim().isEmpty()) {
+            throw CommonError.illegalArgument(key, "Encryption key cannot be 
null or empty");
+        }
+
+        String base64 = key;
+        if (key.startsWith("base64:")) {
+            base64 = key.substring("base64:".length());
+        }
+        base64 = base64.trim();
+
+        byte[] keyBytes;
+        try {
+            keyBytes = Base64.getDecoder().decode(base64);
+        } catch (IllegalArgumentException e) {
+            throw CommonError.illegalArgument(key, "Invalid Base64 encoding in 
encryption key");
+        }
+
+        if (!(keyBytes.length == 16 || keyBytes.length == 24 || 
keyBytes.length == 32)) {
+            throw CommonError.illegalArgument(
+                    key,
+                    "Invalid AES key length: "
+                            + keyBytes.length
+                            + ". Expected 16, 24, or 32 bytes");
+        }
+
+        return new SecretKeySpec(keyBytes, "AES");
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/Encryptor.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/Encryptor.java
new file mode 100644
index 0000000000..28404417d9
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/Encryptor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt.encryptor;
+
+public interface Encryptor {
+    boolean support(String algorithm);
+
+    void init(String key);
+
+    String encrypt(String plainText);
+
+    String decrypt(String cipherText);
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
index 66614d47d8..65f3bc5a1a 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.transform.exception;
 
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.apache.commons.collections4.map.SingletonMap;
 
@@ -26,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.ENCRYPTION_FAILED;
 import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.EXPRESSION_EXECUTE_ERROR;
 import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
 import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
@@ -89,4 +91,9 @@ public class TransformCommonError {
         Map<String, String> params = new SingletonMap<>("message", message);
         return new TransformException(CommonErrorCode.VALIDATION_FAILED, 
params);
     }
+
+    public static SeaTunnelRuntimeException encryptionError(String field, 
Throwable cause) {
+        Map<String, String> params = new SingletonMap<>("field", field);
+        return new TransformException(ENCRYPTION_FAILED, params, cause);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index 20a2c30bf8..bccbc6b61e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -39,7 +39,9 @@ public enum TransformCommonErrorCode implements 
SeaTunnelErrorCode {
             "TRANSFORM_COMMON-06", "The expression '<expression>' of SQL 
transform execute failed"),
     WHERE_STATEMENT_ERROR(
             "TRANSFORM_COMMON-07",
-            "The where statement '<wherebody>' of SQL transform execute 
failed");
+            "The where statement '<wherebody>' of SQL transform execute 
failed"),
+    ENCRYPTION_FAILED("TRANSFORM_COMMON-08", "Field '<field>' encryption 
failed."),
+    ;
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldMultiCatalogTransform.java
similarity index 93%
rename from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
rename to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldMultiCatalogTransform.java
index f281b3d855..aabb61c42b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldMultiCatalogTransform.java
@@ -26,9 +26,9 @@ import 
org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
-public class FieldFieldMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
+public class FilterFieldMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
 
-    public FieldFieldMultiCatalogTransform(
+    public FilterFieldMultiCatalogTransform(
             List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
         super(inputCatalogTables, config);
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
index 3315224905..f390f48424 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
@@ -49,7 +49,7 @@ public class FilterFieldTransformFactory implements 
TableTransformFactory {
     @Override
     public TableTransform createTransform(TableTransformFactoryContext 
context) {
         return () ->
-                new FieldFieldMultiCatalogTransform(
+                new FilterFieldMultiCatalogTransform(
                         context.getCatalogTables(), context.getOptions());
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformTest.java
new file mode 100644
index 0000000000..3f85c822c2
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class FieldEncryptTransformTest {
+    public static final String KEY = "base64:AAAAAAAAAAAAAAAAAAAAAA==";
+    private static CatalogTable catalogTable;
+    private static Object[] values;
+    private static Object[] original;
+    private List<String> encryptFields = Arrays.asList("key2", "key3");
+
+    @BeforeAll
+    static void setUp() {
+        catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("catalog", TablePath.DEFAULT),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key1",
+                                                BasicType.STRING_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key2",
+                                                BasicType.STRING_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key3",
+                                                BasicType.STRING_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key4",
+                                                BasicType.STRING_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key5",
+                                                BasicType.STRING_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .build(),
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "comment");
+        values = new Object[] {"value1", "value2", "value3", "value4", 
"value5"};
+        original = Arrays.copyOf(values, values.length);
+    }
+
+    @Test
+    void testEncryption() {
+        SeaTunnelRow output = encryption();
+        for (int i = 0; i < original.length; i++) {
+            if (i == 1 || i == 2) {
+                Assertions.assertNotEquals(original[i], output.getField(i));
+            } else {
+                Assertions.assertEquals(original[i], output.getField(i));
+            }
+        }
+    }
+
+    @Test
+    void testDecryption() {
+        SeaTunnelRow output = encryption();
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+        configMap.put(FieldEncryptTransformConfig.MODE.key(), "decrypt");
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+
+        SeaTunnelRow input = new SeaTunnelRow(output.getFields());
+        SeaTunnelRow decryptedRow = fieldEncryptTransform.transformRow(input);
+        Assertions.assertNotNull(decryptedRow);
+        Assertions.assertEquals("value2", decryptedRow.getField(1));
+        Assertions.assertEquals("value3", decryptedRow.getField(2));
+    }
+
+    @Test
+    void testNullField() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+
+        Object[] valuesWithNull = new Object[] {"value1", null, "value3", 
"value4", "value5"};
+        SeaTunnelRow input = new SeaTunnelRow(valuesWithNull);
+        SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+        Assertions.assertNull(output.getField(1));
+        Assertions.assertNotNull(output.getField(2));
+    }
+
+    @Test
+    void testEmptyString() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+
+        Object[] valuesWithEmpty = new Object[] {"value1", "", "   ", 
"value4", "value5"};
+        SeaTunnelRow input = new SeaTunnelRow(valuesWithEmpty);
+        SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+        Assertions.assertEquals("", output.getField(1));
+        Assertions.assertEquals("   ", output.getField(2));
+    }
+
+    @Test
+    void testFieldNotFound() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), 
Arrays.asList("nonExistentField"));
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+        Assertions.assertThrows(
+                TransformException.class,
+                () -> new 
FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), catalogTable));
+    }
+
+    @Test
+    void testInvalidKeyLength() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), 
"base64:AAAAAAA=");
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+        SeaTunnelRow input = new SeaTunnelRow(values);
+
+        Assertions.assertThrows(
+                SeaTunnelRuntimeException.class, () -> 
fieldEncryptTransform.transformRow(input));
+    }
+
+    @Test
+    void testUnsupportedAlgorithm() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+        configMap.put(FieldEncryptTransformConfig.ALGORITHM.key(), 
"INVALID_ALGORITHM");
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+        SeaTunnelRow input = new SeaTunnelRow(values);
+
+        Assertions.assertThrows(
+                SeaTunnelRuntimeException.class,
+                () -> {
+                    fieldEncryptTransform.transformRow(input);
+                });
+    }
+
+    @Test
+    void testInvalidMode() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+        configMap.put(FieldEncryptTransformConfig.MODE.key(), "invalid_mode");
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+        SeaTunnelRow input = new SeaTunnelRow(values);
+
+        Assertions.assertThrows(
+                SeaTunnelRuntimeException.class,
+                () -> {
+                    fieldEncryptTransform.transformRow(input);
+                });
+    }
+
+    @Test
+    void testNonStringField() {
+        CatalogTable intCatalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("catalog", TablePath.DEFAULT),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key1",
+                                                BasicType.INT_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .build(),
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "comment");
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), 
Arrays.asList("key1"));
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+        Assertions.assertThrows(
+                SeaTunnelRuntimeException.class,
+                () ->
+                        new FieldEncryptTransform(
+                                ReadonlyConfig.fromMap(configMap), 
intCatalogTable));
+    }
+
+    @Test
+    void testFieldExceedsMaxLength() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+        configMap.put(FieldEncryptTransformConfig.MAX_FIELD_LENGTH.key(), 10);
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+
+        Object[] oversizedValues =
+                new Object[] {"value1", "thisvalueiswaytoolong", "value3", 
"value4", "value5"};
+        SeaTunnelRow input = new SeaTunnelRow(oversizedValues);
+
+        Assertions.assertThrows(
+                SeaTunnelRuntimeException.class, () -> 
fieldEncryptTransform.transformRow(input));
+    }
+
+    @Test
+    void testFieldExactlyMaxLength() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+        configMap.put(FieldEncryptTransformConfig.MAX_FIELD_LENGTH.key(), 6);
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+
+        Object[] exactValues = new Object[] {"value1", "value2", "value3", 
"value4", "value5"};
+        SeaTunnelRow input = new SeaTunnelRow(exactValues);
+        SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+        Assertions.assertNotNull(output);
+        Assertions.assertNotEquals("value2", output.getField(1));
+    }
+
+    @Test
+    void testMaxFieldLengthWithNullField() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+        configMap.put(FieldEncryptTransformConfig.MAX_FIELD_LENGTH.key(), 3);
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+
+        Object[] valuesWithNull = new Object[] {"value1", null, "val", 
"value4", "value5"};
+        SeaTunnelRow input = new SeaTunnelRow(valuesWithNull);
+        SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+        Assertions.assertNull(output.getField(1));
+    }
+
+    private SeaTunnelRow encryption() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+        configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+        FieldEncryptTransform fieldEncryptTransform =
+                new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), 
catalogTable);
+
+        SeaTunnelRow input = new SeaTunnelRow(values);
+        return fieldEncryptTransform.transformRow(input);
+    }
+}

Reply via email to