This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 46554b0b6a [Feature][Transform-V2] Add FieldEncrypt transform for
encrypting selected fields (#10361)
46554b0b6a is described below
commit 46554b0b6ae8eb68ccf0a5837f697d6ef3f4fe43
Author: dy102 <[email protected]>
AuthorDate: Fri Feb 27 18:15:38 2026 +0900
[Feature][Transform-V2] Add FieldEncrypt transform for encrypting selected
fields (#10361)
---
docs/en/transforms/encrypt.md | 59 ++++
docs/zh/transforms/encrypt.md | 66 +++++
.../common/exception/CommonErrorCode.java | 2 +-
.../connector/TransformSpecificationCheckTest.java | 8 +-
.../e2e/transform/TestFieldEncryptIT.java | 57 ++++
.../test/resources/field_decrypt_transform.conf | 63 ++++
.../field_decrypt_transform_multi_table.conf | 189 ++++++++++++
.../test/resources/field_encrypt_transform.conf | 68 +++++
.../field_encrypt_transform_multi_table.conf | 209 ++++++++++++++
.../FieldEncryptMultiCatalogTransform.java} | 10 +-
.../transform/encrypt/FieldEncryptTransform.java | 158 ++++++++++
.../encrypt/FieldEncryptTransformConfig.java | 53 ++++
.../FieldEncryptTransformFactory.java} | 15 +-
.../encrypt/encryptor/AesCbcEncryptor.java | 131 +++++++++
.../transform/encrypt/encryptor/Encryptor.java | 28 ++
.../transform/exception/TransformCommonError.java | 7 +
.../exception/TransformCommonErrorCode.java | 4 +-
....java => FilterFieldMultiCatalogTransform.java} | 4 +-
.../filter/FilterFieldTransformFactory.java | 2 +-
.../encrypt/FieldEncryptTransformTest.java | 321 +++++++++++++++++++++
20 files changed, 1433 insertions(+), 21 deletions(-)
diff --git a/docs/en/transforms/encrypt.md b/docs/en/transforms/encrypt.md
new file mode 100644
index 0000000000..cfcd479d85
--- /dev/null
+++ b/docs/en/transforms/encrypt.md
@@ -0,0 +1,59 @@
+# Encrypt
+
+> Encrypt transform plugin
+
+## Description
+
+The Encrypt transform plugin is used to encrypt or decrypt specified fields in
records using a symmetric encryption algorithm.
+
+## Options
+
+| name | type | required | default value | description
|
+|-------------|--------|----------|---------------|-----------------------------------|
+| `fields` | Array | Yes | - | List of fields to
encrypt/decrypt |
+| `algorithm` | String | No | `AES_CBC` | Encryption algorithm
|
+| `key` | String | Yes | - | Base64-encoded encryption
key |
+| `mode` | String | No | `ENCRYPT` | `ENCRYPT`or `DECRYPT`
|
+
+### algorithm [string]
+
+Encryption algorithm used by this transform.
+Currently, only `AES_CBC` is supported.
+
+### key [string]
+
+The encryption key must be provided in Base64-encoded format.
+Make sure the key length matches the requirements of the selected algorithm.
+For `AES_CBC`, valid key lengths are 16, 24, or 32 bytes (corresponding to
AES-128, AES-192, or AES-256).
+
+**Example**
+- `base64:AAAAAAAAAAAAAAAAAAAAAA==`
+- `AAAAAAAAAAAAAAAAAAAAAA==`
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform
Plugin](common-options.md) for details
+
+## Example
+
+```
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "ENCRYPT"
+ }
+}
+```
+
+```
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "DECRYPT"
+ }
+}
+```
diff --git a/docs/zh/transforms/encrypt.md b/docs/zh/transforms/encrypt.md
new file mode 100644
index 0000000000..5600f6fb13
--- /dev/null
+++ b/docs/zh/transforms/encrypt.md
@@ -0,0 +1,66 @@
+# Encrypt
+
+> 加密 Transform 插件
+
+## 描述
+
+Encrypt Transform 插件用于使用对称加密算法,对记录中指定的字段进行加密或解密。
+
+## 参数说明
+
+| 参数名 | 类型 | 是否必填 | 默认值 | 描述 |
+|-------------|--------|------|-----------|----------------------------|
+| `fields` | Array | 是 | - | 需要加密或解密的字段列表 |
+| `algorithm` | String | 否 | `AES_CBC` | 加密算法 |
+| `key` | String | 是 | - | Base64 编码的加密密钥 |
+| `mode` | String | 否 | `ENCRYPT` | 操作模式:`ENCRYPT` 或 `DECRYPT` |
+
+### algorithm [string]
+
+本 Transform 使用的加密算法。
+目前仅支持 `AES_CBC`。
+
+### key [string]
+
+加密密钥必须以 Base64 编码格式提供。
+请确保密钥长度符合所选加密算法的要求。
+
+对于 `AES_CBC`,支持的密钥长度为 16、24 或 32 字节
+(分别对应 AES-128、AES-192 和 AES-256)。
+
+**示例**
+
+- `base64:AAAAAAAAAAAAAAAAAAAAAA==`
+- `AAAAAAAAAAAAAAAAAAAAAA==`
+
+### common options [string]
+
+Transform 插件的通用参数,请参考 [Transform Plugin](common-options.md)。
+
+## 示例
+
+### 字段加密
+
+```hocon
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "encrypt"
+ }
+}
+```
+
+### 字段解密
+
+```hocon
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "decrypt"
+ }
+}
+```
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 0cf3e11c3d..d0fb019dc5 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -26,7 +26,6 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of
'<field>'"),
UNSUPPORTED_ENCODING("COMMON-08", "unsupported encoding '<encoding>'"),
- VALIDATION_FAILED("COMMMON-38", "Data validation failed: '<message>'"),
CONVERT_TO_SEATUNNEL_TYPE_ERROR(
"COMMON-16",
"'<connector>' <type> unsupported convert type '<dataType>' of
'<field>' to SeaTunnel data type."),
@@ -81,6 +80,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
CLOSE_FAILED("COMMON-36", "'<identifier>' close failed."),
SEATUNNEL_ROW_SERIALIZE_FAILED("COMMON-37", "Seatunnel row serialize
failed. Row={ '<row>' }"),
+ VALIDATION_FAILED("COMMON-38", "Data validation failed: '<message>'"),
;
private final String code;
diff --git
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
index d58e8f083d..ce51350644 100644
---
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
+++
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
@@ -31,10 +31,10 @@ import java.util.List;
import java.util.ServiceLoader;
@Slf4j
-public class TransformSpecificationCheckTest {
+class TransformSpecificationCheckTest {
@Test
- public void testAllTransformUseFactory() {
+ void testAllTransformUseFactory() {
ServiceLoader<SeaTunnelTransform> transforms =
ServiceLoader.load(
SeaTunnelTransform.class,
Thread.currentThread().getContextClassLoader());
@@ -43,11 +43,11 @@ public class TransformSpecificationCheckTest {
FactoryUtil.discoverFactories(
Thread.currentThread().getContextClassLoader(),
TableTransformFactory.class);
- Assertions.assertEquals(20, factories.size());
+ Assertions.assertEquals(21, factories.size());
}
@Test
- public void testAllTransformSupportMultiTable() {
+ void testAllTransformSupportMultiTable() {
List<TableTransformFactory> factories =
FactoryUtil.discoverFactories(
Thread.currentThread().getContextClassLoader(),
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldEncryptIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldEncryptIT.java
new file mode 100644
index 0000000000..5265139941
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldEncryptIT.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestFieldEncryptIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testEncryption(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/field_encrypt_transform.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testDecryption(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/field_decrypt_transform.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testEncryptionMultiTable(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/field_encrypt_transform_multi_table.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testDecryptionMultiTable(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/field_decrypt_transform_multi_table.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform.conf
new file mode 100644
index 0000000000..52eba3d721
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ string.fake.mode = "template"
+ string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+ schema {
+ fields {
+ id = bigint
+ name = string
+ age = smallint
+ }
+ }
+ }
+}
+
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "decrypt"
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ field_rules = [{
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ equals_to = "value1"
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform_multi_table.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform_multi_table.conf
new file mode 100644
index 0000000000..5fdb1c28c8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_decrypt_transform_multi_table.conf
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ string.fake.mode = "template"
+ string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+ row.num = 100
+ schema = {
+ table = "test.abc"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "address"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ string.fake.mode = "template"
+ string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+ row.num = 100
+ schema = {
+ table = "test.xyz"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ },
+ {
+ string.fake.mode = "template"
+ string.template = ["fiXRwCuTG+B0PdQfEzvML589AF/uveSHemzy3KH/Mas="]
+ row.num = 100
+ schema = {
+ table = "test.www"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "address"
+ type = "string"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "DECRYPT"
+
+ table_transform = [
+ {
+ table_path = "test.abc"
+ fields = ["name", "address"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ mode = "DECRYPT"
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.abc"
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ equals_to = "value1"
+ }
+ ]
+ },
+ {
+ field_name = address
+ field_type = string
+ field_value = [
+ {
+ equals_to = "value1"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ table_path = "test.xyz"
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ equals_to = "value1"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ table_path = "test.www"
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ equals_to = "value1"
+ }
+ ]
+ },
+ {
+ field_name = address
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 44
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 44
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform.conf
new file mode 100644
index 0000000000..d0abbd98dc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ string.fake.mode = "template"
+ string.template = ["value1"]
+ schema {
+ fields {
+ id = bigint
+ name = string
+ age = smallint
+ }
+ }
+ }
+}
+
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "encrypt"
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ field_rules = [{
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 44
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 44
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform_multi_table.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform_multi_table.conf
new file mode 100644
index 0000000000..3bb1b9a7c8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_encrypt_transform_multi_table.conf
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ string.fake.mode = "template"
+ string.template = ["value1"]
+ row.num = 100
+ schema = {
+ table = "test.abc"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "address"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ string.fake.mode = "template"
+ string.template = ["value1"]
+ row.num = 100
+ schema = {
+ table = "test.xyz"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ },
+ {
+ string.fake.mode = "template"
+ string.template = ["value1"]
+ row.num = 100
+ schema = {
+ table = "test.www"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "address"
+ type = "string"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+
+transform {
+ FieldEncrypt {
+ fields = ["name"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ algorithm = "AES_CBC"
+ mode = "ENCRYPT"
+
+ table_transform = [
+ {
+ table_path = "test.abc"
+ fields = ["name", "address"]
+ key = "base64:AAAAAAAAAAAAAAAAAAAAAA=="
+ mode = "ENCRYPT"
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.abc"
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 44
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 44
+ }
+ ]
+ },
+ {
+ field_name = address
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 44
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 44
+ }
+ ]
+ }
+ ]
+ },
+ {
+ table_path = "test.xyz"
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 44
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 44
+ }
+ ]
+ }
+ ]
+ },
+ {
+ table_path = "test.www"
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 44
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 44
+ }
+ ]
+ },
+ {
+ field_name = address
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 6
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 6
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptMultiCatalogTransform.java
similarity index 85%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptMultiCatalogTransform.java
index f281b3d855..daf32e4908 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptMultiCatalogTransform.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.filter;
+package org.apache.seatunnel.transform.encrypt;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -26,22 +26,22 @@ import
org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
-public class FieldFieldMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
+public class FieldEncryptMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
- public FieldFieldMultiCatalogTransform(
+ public FieldEncryptMultiCatalogTransform(
List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
super(inputCatalogTables, config);
}
@Override
public String getPluginName() {
- return FilterFieldTransform.PLUGIN_NAME;
+ return FieldEncryptTransform.PLUGIN_NAME;
}
@Override
protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
CatalogTable inputCatalogTable, ReadonlyConfig config) {
- return new FilterFieldTransform(config, inputCatalogTable);
+ return new FieldEncryptTransform(config, inputCatalogTable);
}
@Override
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransform.java
new file mode 100644
index 0000000000..ee599d5b49
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransform.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonError;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+import org.apache.seatunnel.transform.encrypt.encryptor.Encryptor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+
+import lombok.NonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.function.UnaryOperator;
+import java.util.stream.StreamSupport;
+
+public class FieldEncryptTransform extends AbstractCatalogSupportMapTransform {
+ public static final String PLUGIN_NAME = "FieldEncrypt";
+
+ private static final String ENCRYPT = "ENCRYPT";
+ private static final String DECRYPT = "DECRYPT";
+
+ private final List<String> fields = new ArrayList<>();
+ private final String key;
+ private final String encryptAlgorithm;
+ private final String mode;
+ private final int maxFieldLength;
+
+ private transient volatile Encryptor encryptor;
+ private int[] encryptFieldIndexes;
+
+ public FieldEncryptTransform(
+ @NonNull ReadonlyConfig config, @NonNull CatalogTable
catalogTable) {
+ super(catalogTable);
+
+ this.fields.addAll(config.get(FieldEncryptTransformConfig.FIELDS));
+ this.key = config.get(FieldEncryptTransformConfig.KEY);
+ this.encryptAlgorithm =
config.get(FieldEncryptTransformConfig.ALGORITHM);
+ this.mode = config.get(FieldEncryptTransformConfig.MODE);
+ this.maxFieldLength =
config.get(FieldEncryptTransformConfig.MAX_FIELD_LENGTH);
+
+ initializeFieldIndexes();
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ if (encryptor == null) {
+ ServiceLoader<Encryptor> loader =
ServiceLoader.load(Encryptor.class);
+ Optional<Encryptor> optionalEncryptor =
+ StreamSupport.stream(loader.spliterator(), false)
+ .filter(e -> e.support(encryptAlgorithm))
+ .findFirst();
+
+ if (!optionalEncryptor.isPresent()) {
+ throw CommonError.unsupportedOperation(
+ PLUGIN_NAME, "Unsupported encrypt algorithm");
+ }
+ this.encryptor = optionalEncryptor.get();
+ this.encryptor.init(this.key);
+ }
+
+ if (ENCRYPT.equalsIgnoreCase(mode)) {
+ return processFields(inputRow, encryptor::encrypt);
+ } else if (DECRYPT.equalsIgnoreCase(mode)) {
+ return processFields(inputRow, encryptor::decrypt);
+ } else {
+ throw CommonError.illegalArgument(mode, "mode only support encrypt
or decrypt");
+ }
+ }
+
+ private SeaTunnelRow processFields(SeaTunnelRow inputRow,
UnaryOperator<String> action) {
+ SeaTunnelRow outputRow = inputRow.copy();
+ for (int index : encryptFieldIndexes) {
+ Object field = outputRow.getField(index);
+ if (field == null) {
+ continue;
+ }
+
+ String value = field.toString();
+ if (value.length() > maxFieldLength) {
+ throw CommonError.illegalArgument(
+ String.valueOf(value.length()),
+ "Field length exceeds the maximum limit of " +
maxFieldLength);
+ }
+
+ if (StringUtils.isNotBlank(value)) {
+ outputRow.setField(index, action.apply(value));
+ }
+ }
+ return outputRow;
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return inputCatalogTable.getTableSchema();
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId();
+ }
+
+ private void initializeFieldIndexes() {
+ List<Column> columns = inputCatalogTable.getTableSchema().getColumns();
+ encryptFieldIndexes =
+ fields.stream()
+ .mapToInt(
+ fieldName -> {
+ for (int i = 0; i < columns.size(); i++) {
+ if
(columns.get(i).getName().equals(fieldName)) {
+ if (BasicType.STRING_TYPE.equals(
+
columns.get(i).getDataType())) {
+ return i;
+ } else {
+ throw
CommonError.unsupportedDataType(
+ PLUGIN_NAME,
+
columns.get(i).getDataType().toString(),
+
columns.get(i).getName());
+ }
+ }
+ }
+ throw
TransformCommonError.cannotFindInputFieldError(
+ PLUGIN_NAME, fieldName);
+ })
+ .toArray();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformConfig.java
new file mode 100644
index 0000000000..584f6490ed
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.transform.encrypt.encryptor.AesCbcEncryptor;
+
+import java.util.List;
+
+public class FieldEncryptTransformConfig {
+ public static final Option<List<String>> FIELDS =
+ Options.key("fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The list of fields that need to be
encrypted.");
+
+ public static final Option<String> ALGORITHM =
+ Options.key("algorithm")
+ .stringType()
+ .defaultValue(AesCbcEncryptor.IDENTIFIER)
+ .withDescription("The encryption algorithm, support
AES_CBC.");
+
+ public static final Option<String> KEY =
+
Options.key("key").stringType().noDefaultValue().withDescription("The
encryption key.");
+
+ public static final Option<String> MODE =
+ Options.key("mode")
+ .stringType()
+ .defaultValue("encrypt")
+ .withDescription("The mode of the transform, support
encrypt and decrypt.");
+
+ public static final Option<Integer> MAX_FIELD_LENGTH =
+ Options.key("max_field_length")
+ .intType()
+ .defaultValue(10 * 1024 * 1024) // 10MB
+ .withDescription("Maximum field length to encrypt");
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformFactory.java
similarity index 77%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformFactory.java
index 3315224905..d12d4292a8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.filter;
+package org.apache.seatunnel.transform.encrypt;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableTransform;
@@ -26,10 +26,10 @@ import
org.apache.seatunnel.transform.common.TransformCommonOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.transform.filter.FilterFieldTransform.PLUGIN_NAME;
+import static
org.apache.seatunnel.transform.encrypt.FieldEncryptTransform.PLUGIN_NAME;
@AutoService(Factory.class)
-public class FilterFieldTransformFactory implements TableTransformFactory {
+public class FieldEncryptTransformFactory implements TableTransformFactory {
@Override
public String factoryIdentifier() {
return PLUGIN_NAME;
@@ -38,9 +38,10 @@ public class FilterFieldTransformFactory implements
TableTransformFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .optional(
- FilterFieldTransformConfig.INCLUDE_FIELDS,
- FilterFieldTransformConfig.EXCLUDE_FIELDS)
+ .required(FieldEncryptTransformConfig.FIELDS)
+ .required(FieldEncryptTransformConfig.KEY)
+ .optional(FieldEncryptTransformConfig.ALGORITHM)
+ .optional(FieldEncryptTransformConfig.MODE)
.optional(TransformCommonOptions.MULTI_TABLES)
.optional(TransformCommonOptions.TABLE_MATCH_REGEX)
.build();
@@ -49,7 +50,7 @@ public class FilterFieldTransformFactory implements
TableTransformFactory {
@Override
public TableTransform createTransform(TableTransformFactoryContext
context) {
return () ->
- new FieldFieldMultiCatalogTransform(
+ new FieldEncryptMultiCatalogTransform(
context.getCatalogTables(), context.getOptions());
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/AesCbcEncryptor.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/AesCbcEncryptor.java
new file mode 100644
index 0000000000..33b137d5ed
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/AesCbcEncryptor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt.encryptor;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+
+import com.google.auto.service.AutoService;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Base64;
+
+@AutoService(Encryptor.class)
+public class AesCbcEncryptor implements Encryptor {
+ public static final String IDENTIFIER = "AES_CBC";
+
+ private static final int IV_SIZE = 16;
+ private static final SecureRandom SECURE_RANDOM = new SecureRandom();
+ private static final String ALGORITHM = "AES/CBC/PKCS5Padding";
+
+ private SecretKeySpec keySpec;
+
+ @Override
+ public boolean support(String algorithm) {
+ return IDENTIFIER.equals(algorithm);
+ }
+
+ @Override
+ public void init(String key) {
+ this.keySpec = buildAesKey(key);
+ }
+
+ @Override
+ public String encrypt(String plainText) {
+ byte[] iv = new byte[IV_SIZE];
+ SECURE_RANDOM.nextBytes(iv);
+
+ IvParameterSpec ivSpec = new IvParameterSpec(iv);
+
+ byte[] encrypted;
+ try {
+ Cipher cipher = Cipher.getInstance(ALGORITHM);
+ cipher.init(Cipher.ENCRYPT_MODE, keySpec, ivSpec);
+ encrypted =
cipher.doFinal(plainText.getBytes(StandardCharsets.UTF_8));
+ } catch (Exception e) {
+ throw TransformCommonError.encryptionError("plaintext length:" +
plainText.length(), e);
+ }
+
+ byte[] encryptedWithIv = new byte[IV_SIZE + encrypted.length];
+ System.arraycopy(iv, 0, encryptedWithIv, 0, IV_SIZE);
+ System.arraycopy(encrypted, 0, encryptedWithIv, IV_SIZE,
encrypted.length);
+
+ return Base64.getEncoder().encodeToString(encryptedWithIv);
+ }
+
+ @Override
+ public String decrypt(String cipherText) {
+ byte[] decoded = Base64.getDecoder().decode(cipherText);
+ byte[] iv = new byte[IV_SIZE];
+ if (decoded.length < IV_SIZE) {
+ throw CommonError.illegalArgument(cipherText, "Invalid encrypted
value");
+ }
+ byte[] encrypted = new byte[decoded.length - IV_SIZE];
+
+ System.arraycopy(decoded, 0, iv, 0, IV_SIZE);
+ System.arraycopy(decoded, IV_SIZE, encrypted, 0, encrypted.length);
+
+ IvParameterSpec ivSpec = new IvParameterSpec(iv);
+
+ byte[] original;
+ try {
+ Cipher cipher = Cipher.getInstance(ALGORITHM);
+ cipher.init(Cipher.DECRYPT_MODE, keySpec, ivSpec);
+ original = cipher.doFinal(encrypted);
+ } catch (Exception e) {
+ throw TransformCommonError.encryptionError(
+ "ciphertext length:" + cipherText.length(), e);
+ }
+
+ return new String(original, StandardCharsets.UTF_8);
+ }
+
+ private SecretKeySpec buildAesKey(String key) {
+ if (key == null || key.trim().isEmpty()) {
+ throw CommonError.illegalArgument(key, "Encryption key cannot be
null or empty");
+ }
+
+ String base64 = key;
+ if (key.startsWith("base64:")) {
+ base64 = key.substring("base64:".length());
+ }
+ base64 = base64.trim();
+
+ byte[] keyBytes;
+ try {
+ keyBytes = Base64.getDecoder().decode(base64);
+ } catch (IllegalArgumentException e) {
+ throw CommonError.illegalArgument(key, "Invalid Base64 encoding in
encryption key");
+ }
+
+ if (!(keyBytes.length == 16 || keyBytes.length == 24 ||
keyBytes.length == 32)) {
+ throw CommonError.illegalArgument(
+ key,
+ "Invalid AES key length: "
+ + keyBytes.length
+ + ". Expected 16, 24, or 32 bytes");
+ }
+
+ return new SecretKeySpec(keyBytes, "AES");
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/Encryptor.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/Encryptor.java
new file mode 100644
index 0000000000..28404417d9
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/encrypt/encryptor/Encryptor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt.encryptor;
+
+public interface Encryptor {
+ boolean support(String algorithm);
+
+ void init(String key);
+
+ String encrypt(String plainText);
+
+ String decrypt(String cipherText);
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
index 66614d47d8..65f3bc5a1a 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.transform.exception;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.commons.collections4.map.SingletonMap;
@@ -26,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.ENCRYPTION_FAILED;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.EXPRESSION_EXECUTE_ERROR;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
@@ -89,4 +91,9 @@ public class TransformCommonError {
Map<String, String> params = new SingletonMap<>("message", message);
return new TransformException(CommonErrorCode.VALIDATION_FAILED,
params);
}
+
+ public static SeaTunnelRuntimeException encryptionError(String field,
Throwable cause) {
+ Map<String, String> params = new SingletonMap<>("field", field);
+ return new TransformException(ENCRYPTION_FAILED, params, cause);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index 20a2c30bf8..bccbc6b61e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -39,7 +39,9 @@ public enum TransformCommonErrorCode implements
SeaTunnelErrorCode {
"TRANSFORM_COMMON-06", "The expression '<expression>' of SQL
transform execute failed"),
WHERE_STATEMENT_ERROR(
"TRANSFORM_COMMON-07",
- "The where statement '<wherebody>' of SQL transform execute
failed");
+ "The where statement '<wherebody>' of SQL transform execute
failed"),
+ ENCRYPTION_FAILED("TRANSFORM_COMMON-08", "Field '<field>' encryption
failed."),
+ ;
private final String code;
private final String description;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldMultiCatalogTransform.java
similarity index 93%
rename from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
rename to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldMultiCatalogTransform.java
index f281b3d855..aabb61c42b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldMultiCatalogTransform.java
@@ -26,9 +26,9 @@ import
org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
-public class FieldFieldMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
+public class FilterFieldMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
- public FieldFieldMultiCatalogTransform(
+ public FilterFieldMultiCatalogTransform(
List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
super(inputCatalogTables, config);
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
index 3315224905..f390f48424 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
@@ -49,7 +49,7 @@ public class FilterFieldTransformFactory implements
TableTransformFactory {
@Override
public TableTransform createTransform(TableTransformFactoryContext
context) {
return () ->
- new FieldFieldMultiCatalogTransform(
+ new FilterFieldMultiCatalogTransform(
context.getCatalogTables(), context.getOptions());
}
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformTest.java
new file mode 100644
index 0000000000..3f85c822c2
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/encrypt/FieldEncryptTransformTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.encrypt;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class FieldEncryptTransformTest {
+ public static final String KEY = "base64:AAAAAAAAAAAAAAAAAAAAAA==";
+ private static CatalogTable catalogTable;
+ private static Object[] values;
+ private static Object[] original;
+ private List<String> encryptFields = Arrays.asList("key2", "key3");
+
+ @BeforeAll
+ static void setUp() {
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", TablePath.DEFAULT),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "key1",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key2",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key3",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key4",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key5",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "comment");
+ values = new Object[] {"value1", "value2", "value3", "value4",
"value5"};
+ original = Arrays.copyOf(values, values.length);
+ }
+
+ @Test
+ void testEncryption() {
+ SeaTunnelRow output = encryption();
+ for (int i = 0; i < original.length; i++) {
+ if (i == 1 || i == 2) {
+ Assertions.assertNotEquals(original[i], output.getField(i));
+ } else {
+ Assertions.assertEquals(original[i], output.getField(i));
+ }
+ }
+ }
+
+ @Test
+ void testDecryption() {
+ SeaTunnelRow output = encryption();
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+ configMap.put(FieldEncryptTransformConfig.MODE.key(), "decrypt");
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ SeaTunnelRow input = new SeaTunnelRow(output.getFields());
+ SeaTunnelRow decryptedRow = fieldEncryptTransform.transformRow(input);
+ Assertions.assertNotNull(decryptedRow);
+ Assertions.assertEquals("value2", decryptedRow.getField(1));
+ Assertions.assertEquals("value3", decryptedRow.getField(2));
+ }
+
+ @Test
+ void testNullField() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ Object[] valuesWithNull = new Object[] {"value1", null, "value3",
"value4", "value5"};
+ SeaTunnelRow input = new SeaTunnelRow(valuesWithNull);
+ SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+ Assertions.assertNull(output.getField(1));
+ Assertions.assertNotNull(output.getField(2));
+ }
+
+ @Test
+ void testEmptyString() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ Object[] valuesWithEmpty = new Object[] {"value1", "", " ",
"value4", "value5"};
+ SeaTunnelRow input = new SeaTunnelRow(valuesWithEmpty);
+ SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+ Assertions.assertEquals("", output.getField(1));
+ Assertions.assertEquals(" ", output.getField(2));
+ }
+
+ @Test
+ void testFieldNotFound() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(),
Arrays.asList("nonExistentField"));
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+ Assertions.assertThrows(
+ TransformException.class,
+ () -> new
FieldEncryptTransform(ReadonlyConfig.fromMap(configMap), catalogTable));
+ }
+
+ @Test
+ void testInvalidKeyLength() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(),
"base64:AAAAAAA=");
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+ SeaTunnelRow input = new SeaTunnelRow(values);
+
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class, () ->
fieldEncryptTransform.transformRow(input));
+ }
+
+ @Test
+ void testUnsupportedAlgorithm() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+ configMap.put(FieldEncryptTransformConfig.ALGORITHM.key(),
"INVALID_ALGORITHM");
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+ SeaTunnelRow input = new SeaTunnelRow(values);
+
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () -> {
+ fieldEncryptTransform.transformRow(input);
+ });
+ }
+
+ @Test
+ void testInvalidMode() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+ configMap.put(FieldEncryptTransformConfig.MODE.key(), "invalid_mode");
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+ SeaTunnelRow input = new SeaTunnelRow(values);
+
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () -> {
+ fieldEncryptTransform.transformRow(input);
+ });
+ }
+
+ @Test
+ void testNonStringField() {
+ CatalogTable intCatalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", TablePath.DEFAULT),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "key1",
+ BasicType.INT_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "comment");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(),
Arrays.asList("key1"));
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ new FieldEncryptTransform(
+ ReadonlyConfig.fromMap(configMap),
intCatalogTable));
+ }
+
+ @Test
+ void testFieldExceedsMaxLength() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+ configMap.put(FieldEncryptTransformConfig.MAX_FIELD_LENGTH.key(), 10);
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ Object[] oversizedValues =
+ new Object[] {"value1", "thisvalueiswaytoolong", "value3",
"value4", "value5"};
+ SeaTunnelRow input = new SeaTunnelRow(oversizedValues);
+
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class, () ->
fieldEncryptTransform.transformRow(input));
+ }
+
+ @Test
+ void testFieldExactlyMaxLength() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+ configMap.put(FieldEncryptTransformConfig.MAX_FIELD_LENGTH.key(), 6);
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ Object[] exactValues = new Object[] {"value1", "value2", "value3",
"value4", "value5"};
+ SeaTunnelRow input = new SeaTunnelRow(exactValues);
+ SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+ Assertions.assertNotNull(output);
+ Assertions.assertNotEquals("value2", output.getField(1));
+ }
+
+ @Test
+ void testMaxFieldLengthWithNullField() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+ configMap.put(FieldEncryptTransformConfig.MAX_FIELD_LENGTH.key(), 3);
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ Object[] valuesWithNull = new Object[] {"value1", null, "val",
"value4", "value5"};
+ SeaTunnelRow input = new SeaTunnelRow(valuesWithNull);
+ SeaTunnelRow output = fieldEncryptTransform.transformRow(input);
+
+ Assertions.assertNull(output.getField(1));
+ }
+
+ private SeaTunnelRow encryption() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FieldEncryptTransformConfig.FIELDS.key(), encryptFields);
+ configMap.put(FieldEncryptTransformConfig.KEY.key(), KEY);
+
+ FieldEncryptTransform fieldEncryptTransform =
+ new FieldEncryptTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ SeaTunnelRow input = new SeaTunnelRow(values);
+ return fieldEncryptTransform.transformRow(input);
+ }
+}