This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 06df51bcbb [Feature][Transform] Support rename table/column (#8170)
06df51bcbb is described below
commit 06df51bcbbb3294f40ebb5607c70bc031c5175f8
Author: hailin0 <[email protected]>
AuthorDate: Thu Dec 5 14:03:10 2024 +0800
[Feature][Transform] Support rename table/column (#8170)
---
docs/en/transform-v2/field-rename.md | 132 ++++++++
docs/en/transform-v2/table-rename.md | 132 ++++++++
plugin-mapping.properties | 2 +
.../connector/TransformSpecificationCheckTest.java | 2 +-
.../seatunnel/e2e/transform/TestRenameIT.java | 43 +++
.../resources/table_field_rename_multi_table.conf | 228 ++++++++++++++
.../seatunnel/transform/rename/ConvertCase.java | 23 ++
.../transform/rename/FieldRenameConfig.java | 129 ++++++++
.../rename/FieldRenameMultiCatalogTransform.java | 45 +++
.../transform/rename/FieldRenameTransform.java | 333 +++++++++++++++++++++
.../rename/FieldRenameTransformFactory.java | 58 ++++
.../transform/rename/TableRenameConfig.java | 98 ++++++
.../rename/TableRenameMultiCatalogTransform.java | 45 +++
.../transform/rename/TableRenameTransform.java | 267 +++++++++++++++++
.../rename/TableRenameTransformFactory.java | 56 ++++
.../transform/rename/FieldRenameTransformTest.java | 243 +++++++++++++++
.../transform/rename/TableRenameTransformTest.java | 150 ++++++++++
17 files changed, 1985 insertions(+), 1 deletion(-)
diff --git a/docs/en/transform-v2/field-rename.md
b/docs/en/transform-v2/field-rename.md
new file mode 100644
index 0000000000..faf198b695
--- /dev/null
+++ b/docs/en/transform-v2/field-rename.md
@@ -0,0 +1,132 @@
+# FieldRename
+
+> FieldRename transform plugin
+
+## Description
+
+FieldRename transform plugin for rename field name.
+
+## Options
+
+| name | type | required | default value | Description
|
+|:-----------------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------|
+| convert_case | string | no | | The case
conversion type. The options can be `UPPER`, `LOWER`
|
+| prefix | string | no | | The prefix to
be added to the field name
|
+| suffix | string | no | | The suffix to
be added to the field name
|
+| replacements_with_regex | array | no | | The array of
replacement rules with regex. The replacement rule is a map with `replace_from`
and `replace_to` fields. |
+
+## Examples
+
+### Convert field to uppercase
+
+```
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ MySQL-CDC {
+ plugin_output = "customers_mysql_cdc"
+
+ username = "root"
+ password = "123456"
+ table-names = ["source.user_shop", "source.user_order"]
+ base-url = "jdbc:mysql://localhost:3306/source"
+ }
+}
+
+transform {
+ FieldRename {
+ plugin_input = "customers_mysql_cdc"
+ plugin_output = "trans_result"
+
+ convert_case = "UPPER"
+ prefix = "F_"
+ suffix = "_S"
+ replacements_with_regex = [
+ {
+ replace_from = "create_time"
+ replace_to = "SOURCE_CREATE_TIME"
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "trans_result"
+
+ driver="oracle.jdbc.OracleDriver"
+ url="jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+ user="myuser"
+ password="mypwd"
+
+ generate_sink_sql = true
+ database = "ORCLCDB"
+ table = "${database_name}.${table_name}"
+ primary_keys = ["${primary_key}"]
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
+
+### Convert field name to lowercase
+
+```
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Oracle-CDC {
+ plugin_output = "customers_oracle_cdc"
+
+ base-url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB"
+ username = "dbzuser"
+ password = "dbz"
+ database-names = ["ORCLCDB"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["SOURCE.USER_SHOP", "SOURCE.USER_ORDER"]
+ }
+}
+
+transform {
+ FieldRename {
+ plugin_input = "customers_oracle_cdc"
+ plugin_output = "trans_result"
+
+ convert_case = "LOWER"
+ prefix = "f_"
+ suffix = "_s"
+ replacements_with_regex = [
+ {
+ replace_from = "CREATE_TIME"
+ replace_to = "source_create_time"
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "trans_result"
+
+ url = "jdbc:mysql://localhost:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_sink"
+ password = "mysqlpw"
+
+ generate_sink_sql = true
+ database = "${schema_name}"
+ table = "${table_name}"
+ primary_keys = ["${primary_key}"]
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/en/transform-v2/table-rename.md
b/docs/en/transform-v2/table-rename.md
new file mode 100644
index 0000000000..6cd1a60de7
--- /dev/null
+++ b/docs/en/transform-v2/table-rename.md
@@ -0,0 +1,132 @@
+# TableRename
+
+> TableRename transform plugin
+
+## Description
+
+TableRename transform plugin for rename table name.
+
+## Options
+
+| name | type | required | default value | Description
|
+|:-----------------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------|
+| convert_case | string | no | | The case
conversion type. The options can be `UPPER`, `LOWER`
|
+| prefix | string | no | | The prefix to
be added to the table name
|
+| suffix | string | no | | The suffix to
be added to the table name
|
+| replacements_with_regex | array | no | | The array of
replacement rules with regex. The replacement rule is a map with `replace_from`
and `replace_to` fields. |
+
+## Examples
+
+### Convert table name to uppercase
+
+```
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ MySQL-CDC {
+ plugin_output = "customers_mysql_cdc"
+
+ username = "root"
+ password = "123456"
+ table-names = ["source.user_shop", "source.user_order"]
+ base-url = "jdbc:mysql://localhost:3306/source"
+ }
+}
+
+transform {
+ TableRename {
+ plugin_input = "customers_mysql_cdc"
+ plugin_output = "trans_result"
+
+ convert_case = "UPPER"
+ prefix = "CDC_"
+ suffix = "_TABLE"
+ replacements_with_regex = [
+ {
+ replace_from = "user"
+ replace_to = "U"
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "trans_result"
+
+ driver="oracle.jdbc.OracleDriver"
+ url="jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+ user="myuser"
+ password="mypwd"
+
+ generate_sink_sql = true
+ database = "ORCLCDB"
+ table = "${database_name}.${table_name}"
+ primary_keys = ["${primary_key}"]
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
+
+### Convert table name to lowercase
+
+```
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Oracle-CDC {
+ plugin_output = "customers_oracle_cdc"
+
+ base-url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB"
+ username = "dbzuser"
+ password = "dbz"
+ database-names = ["ORCLCDB"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["SOURCE.USER_SHOP", "SOURCE.USER_ORDER"]
+ }
+}
+
+transform {
+ TableRename {
+ plugin_input = "customers_oracle_cdc"
+ plugin_output = "trans_result"
+
+ convert_case = "LOWER"
+ prefix = "cdc_"
+ suffix = "_table"
+ replacements_with_regex = [
+ {
+ replace_from = "USER"
+ replace_to = "u"
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "trans_result"
+
+ url = "jdbc:mysql://localhost:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_sink"
+ password = "mysqlpw"
+
+ generate_sink_sql = true
+ database = "${schema_name}"
+ table = "${table_name}"
+ primary_keys = ["${primary_key}"]
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c494686161..32cc214c21 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -155,3 +155,5 @@ seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
seatunnel.transform.Metadata = seatunnel-transforms-v2
+seatunnel.transform.FieldRename = seatunnel-transforms-v2
+seatunnel.transform.TableRename = seatunnel-transforms-v2
diff --git
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
index bb3fe1f55b..89f3ec9c48 100644
---
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
+++
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
@@ -43,7 +43,7 @@ public class TransformSpecificationCheckTest {
FactoryUtil.discoverFactories(
Thread.currentThread().getContextClassLoader(),
TableTransformFactory.class);
- Assertions.assertEquals(13, factories.size());
+ Assertions.assertEquals(15, factories.size());
}
@Test
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java
new file mode 100644
index 0000000000..8b2e55551b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestRenameIT extends TestSuiteBase {
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not multi table transform")
+ @TestTemplate
+ public void testRenameMultiTable(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/table_field_rename_multi_table.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf
new file mode 100644
index 0000000000..254f2032d4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "source1"
+
+ tables_configs = [
+ {
+ row.num = 3
+ schema = {
+ table = "test.abc"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ row.num = 5
+ schema = {
+ table = "test.xyz"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ row.num = 10
+ schema = {
+ table = "test.www"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+transform {
+ TableRename {
+ plugin_input = "source1"
+ plugin_output = "transform1"
+
+ table_match_regex = "test.a.*"
+ table_transform = [{
+ table_path = "test.xyz"
+ convert_case = "UPPER"
+ prefix = "P2_"
+ suffix = "_S2"
+ replacements_with_regex = [
+ {
+ replace_from = "z"
+ replace_to = "ZZ"
+ }
+ ]
+ }]
+ convert_case = "UPPER"
+ prefix = "P1_"
+ suffix = "_S1"
+ replacements_with_regex = [
+ {
+ replace_from = "c"
+ replace_to = "CC"
+ }
+ ]
+ }
+
+ FieldRename {
+ plugin_input = "transform1"
+ plugin_output = "transform2"
+
+ table_match_regex = "TEST.P.*"
+ table_transform = [{
+ table_path = "TEST.P2_XYZZ_S2"
+ convert_case = "UPPER"
+ prefix = "F_P2_"
+ suffix = "_S2_F"
+ replacements_with_regex = [
+ {
+ replace_from = "id"
+ replace_to = "ID_1"
+ }
+ ]
+ }]
+ convert_case = "UPPER"
+ prefix = "F_P1_"
+ suffix = "_S1_F"
+ replacements_with_regex = [
+ {
+ replace_from = "name"
+ replace_to = "NAME_1"
+ }
+ ]
+ }
+}
+sink {
+ Assert {
+ plugin_input = "transform2"
+
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "TEST.P1_ABCC_S1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = F_P1_ID_S1_F
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = F_P1_NAME_1_S1_F
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ },
+ {
+ table_path = "TEST.P2_XYZZ_S2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = F_P2_ID_1_S2_F
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = F_P2_NAME_S2_F
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ },
+ {
+ table_path = "test.www"
+ catalog_table_rule {
+ table_path = "test.www"
+ column_rule = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java
new file mode 100644
index 0000000000..5dafb26cd3
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+public enum ConvertCase {
+ LOWER,
+ UPPER
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java
new file mode 100644
index 0000000000..7bd8ec1495
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+public class FieldRenameConfig implements Serializable {
+
+ public static final Option<ConvertCase> CONVERT_CASE =
+ Options.key("convert_case")
+ .enumType(ConvertCase.class)
+ .noDefaultValue()
+ .withDescription("Convert to uppercase or lowercase");
+
+ public static final Option<String> PREFIX =
+ Options.key("prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Add prefix for field name");
+
+ public static final Option<String> SUFFIX =
+ Options.key("suffix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Add suffix for field name");
+
+ public static final Option<List<ReplacementsWithRegex>>
REPLACEMENTS_WITH_REGEX =
+ Options.key("replacements_with_regex")
+ .listType(ReplacementsWithRegex.class)
+ .noDefaultValue()
+ .withDescription("The regex of replace fields name to ");
+
+ public static final Option<List<SpecificModify>> SPECIFIC =
+ Options.key("specific")
+ .listType(SpecificModify.class)
+ .noDefaultValue()
+ .withDescription("The specific modify field name");
+
+ @JsonAlias("table_match_regex")
+ private String tableMatchRegex;
+
+ @JsonAlias("is_table_match_regex")
+ private Boolean isTableMatchRegex;
+
+ @JsonAlias("match_tables")
+ private List<String> matchTables;
+
+ @JsonAlias("convert_case")
+ private ConvertCase convertCase;
+
+ @JsonAlias("prefix")
+ private String prefix;
+
+ @JsonAlias("suffix")
+ private String suffix;
+
+ @JsonAlias("replacements_with_regex")
+ private List<ReplacementsWithRegex> replacementsWithRegex;
+
+ @JsonAlias("specific")
+ private List<SpecificModify> specific;
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class SpecificModify implements Serializable {
+ @JsonAlias("field_name")
+ private String fieldName;
+
+ @JsonAlias("target_name")
+ private String targetName;
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class ReplacementsWithRegex implements Serializable {
+ @JsonAlias("replace_from")
+ private String replaceFrom;
+
+ @JsonAlias("replace_to")
+ private String replaceTo;
+
+ @JsonAlias("is_regex")
+ private Boolean isRegex;
+ }
+
+ public static FieldRenameConfig of(ReadonlyConfig config) {
+ FieldRenameConfig renameConfig = new FieldRenameConfig();
+ renameConfig.setConvertCase(config.get(CONVERT_CASE));
+ renameConfig.setPrefix(config.get(PREFIX));
+ renameConfig.setSuffix(config.get(SUFFIX));
+
renameConfig.setReplacementsWithRegex(config.get(REPLACEMENTS_WITH_REGEX));
+ renameConfig.setSpecific(config.get(SPECIFIC));
+ return renameConfig;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
new file mode 100644
index 0000000000..dd0f61e4e4
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+
+import java.util.List;
+
+public class FieldRenameMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
+
+ public FieldRenameMultiCatalogTransform(
+ List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+ super(inputCatalogTables, config);
+ }
+
+ @Override
+ public String getPluginName() {
+ return FieldRenameTransform.PLUGIN_NAME;
+ }
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
+ CatalogTable table, ReadonlyConfig config) {
+ return new FieldRenameTransform(FieldRenameConfig.of(config), table);
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java
new file mode 100644
index 0000000000..b151b355ac
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
+import
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class FieldRenameTransform extends AbstractCatalogSupportMapTransform {
+ public static String PLUGIN_NAME = "FieldRename";
+
+ private CatalogTable inputTable;
+ private final FieldRenameConfig config;
+ private TableSchemaChangeEventHandler tableSchemaChangeEventHandler;
+
+ public FieldRenameTransform(FieldRenameConfig config, CatalogTable table) {
+ super(table);
+ this.config = config;
+ this.inputTable = table;
+ this.tableSchemaChangeEventHandler = new
TableSchemaChangeEventDispatcher();
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ return inputRow;
+ }
+
+ @Override
+ public SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent event) {
+ TableSchema newTableSchema =
+
tableSchemaChangeEventHandler.reset(inputTable.getTableSchema()).apply(event);
+ this.inputTable =
+ CatalogTable.of(
+ inputTable.getTableId(),
+ newTableSchema,
+ inputTable.getOptions(),
+ inputTable.getPartitionKeys(),
+ inputTable.getComment());
+
+ if (event instanceof AlterTableColumnsEvent) {
+ AlterTableColumnsEvent alterTableColumnsEvent =
(AlterTableColumnsEvent) event;
+ AlterTableColumnsEvent newEvent =
+ new AlterTableColumnsEvent(
+ event.tableIdentifier(),
+ alterTableColumnsEvent.getEvents().stream()
+ .map(this::convertName)
+ .collect(Collectors.toList()));
+
+ newEvent.setJobId(event.getJobId());
+ newEvent.setStatement(((AlterTableColumnsEvent)
event).getStatement());
+ newEvent.setSourceDialectName(((AlterTableColumnsEvent)
event).getSourceDialectName());
+ if (event.getChangeAfter() != null) {
+ newEvent.setChangeAfter(
+ CatalogTable.of(
+ event.getChangeAfter().getTableId(),
event.getChangeAfter()));
+ }
+ return newEvent;
+ }
+ if (event instanceof AlterTableColumnEvent) {
+ return convertName((AlterTableColumnEvent) event);
+ }
+ return event;
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return convertTableSchema(inputTable.getTableSchema());
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputTable.getTableId();
+ }
+
+ @VisibleForTesting
+ public String convertName(String name) {
+ if (name == null) {
+ return null;
+ }
+
+ Optional<FieldRenameConfig.SpecificModify> specificValue =
getSpecificModify(name);
+ if (specificValue.isPresent()) {
+ return specificValue.get().getTargetName();
+ }
+ String replaceTo = null;
+ Map<Integer, Integer> replaceIndex = new LinkedHashMap<>();
+
+ if (CollectionUtils.isNotEmpty(config.getReplacementsWithRegex())) {
+ for (FieldRenameConfig.ReplacementsWithRegex replacementsWithRegex
:
+ config.getReplacementsWithRegex()) {
+ Boolean isRegex = replacementsWithRegex.getIsRegex();
+ String replacement = replacementsWithRegex.getReplaceFrom();
+ if (StringUtils.isNotEmpty(replacement)) {
+ Map<Integer, Integer> matched = new LinkedHashMap<>();
+ if (BooleanUtils.isNotTrue(isRegex)) {
+ if (StringUtils.equals(replacement, name)) {
+ matched.put(0, name.length());
+ }
+ } else {
+ Matcher matcher =
Pattern.compile(replacement).matcher(name);
+ while (matcher.find()) {
+ matched.put(matcher.start(), matcher.end());
+ }
+ }
+ if (!matched.isEmpty()) {
+ replaceTo = replacementsWithRegex.getReplaceTo();
+ replaceIndex = matched;
+ }
+ }
+ }
+ }
+
+ if (config.getConvertCase() != null) {
+ switch (config.getConvertCase()) {
+ case UPPER:
+ name = name.toUpperCase();
+ break;
+ case LOWER:
+ name = name.toLowerCase();
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported convert case: " +
config.getConvertCase());
+ }
+ }
+ int offset = 0;
+ for (Map.Entry<Integer, Integer> index : replaceIndex.entrySet()) {
+ int indexStart = index.getKey();
+ int indexEnd = index.getValue();
+ name =
+ name.substring(0, indexStart + offset)
+ + replaceTo.trim()
+ + name.substring(indexEnd + offset);
+ offset += replaceTo.trim().length() - (indexEnd - indexStart);
+ }
+ if (StringUtils.isNotBlank(config.getPrefix())) {
+ name = config.getPrefix().trim() + name;
+ }
+ if (StringUtils.isNotBlank(config.getSuffix())) {
+ name = name + config.getSuffix().trim();
+ }
+ return name;
+ }
+
+ private Optional<FieldRenameConfig.SpecificModify>
getSpecificModify(String oldColumnName) {
+ if (config.getSpecific() == null) {
+ return Optional.empty();
+ }
+ return config.getSpecific().stream()
+ .filter(specific ->
specific.getFieldName().equals(oldColumnName))
+ .findFirst();
+ }
+
+ @VisibleForTesting
+ public AlterTableColumnEvent convertName(AlterTableColumnEvent event) {
+ AlterTableColumnEvent newEvent = event;
+ switch (event.getEventType()) {
+ case SCHEMA_CHANGE_ADD_COLUMN:
+ AlterTableAddColumnEvent addColumnEvent =
(AlterTableAddColumnEvent) event;
+ newEvent =
+ new AlterTableAddColumnEvent(
+ event.tableIdentifier(),
+ convertName(addColumnEvent.getColumn()),
+ addColumnEvent.isFirst(),
+ convertName(addColumnEvent.getAfterColumn()));
+ break;
+ case SCHEMA_CHANGE_DROP_COLUMN:
+ AlterTableDropColumnEvent dropColumnEvent =
(AlterTableDropColumnEvent) event;
+ newEvent =
+ new AlterTableDropColumnEvent(
+ event.tableIdentifier(),
convertName(dropColumnEvent.getColumn()));
+ break;
+ case SCHEMA_CHANGE_MODIFY_COLUMN:
+ AlterTableModifyColumnEvent modifyColumnEvent =
(AlterTableModifyColumnEvent) event;
+ newEvent =
+ new AlterTableModifyColumnEvent(
+ event.tableIdentifier(),
+ convertName(modifyColumnEvent.getColumn()),
+ modifyColumnEvent.isFirst(),
+
convertName(modifyColumnEvent.getAfterColumn()));
+ break;
+ case SCHEMA_CHANGE_CHANGE_COLUMN:
+ AlterTableChangeColumnEvent changeColumnEvent =
(AlterTableChangeColumnEvent) event;
+ boolean nameChanged =
+ !changeColumnEvent
+ .getOldColumn()
+
.equals(changeColumnEvent.getColumn().getName());
+ if (nameChanged) {
+ log.warn(
+ "FieldRenameTransform does not support changing
column name, "
+ + "old column name: {}, new column name:
{}",
+ changeColumnEvent.getOldColumn(),
+ changeColumnEvent.getColumn().getName());
+ return changeColumnEvent;
+ }
+
+ newEvent =
+ new AlterTableChangeColumnEvent(
+ event.tableIdentifier(),
+ convertName(changeColumnEvent.getOldColumn()),
+ convertName(changeColumnEvent.getColumn()),
+ changeColumnEvent.isFirst(),
+
convertName(changeColumnEvent.getAfterColumn()));
+ break;
+ default:
+ log.warn("Unsupported event: {}", event);
+ return event;
+ }
+
+ newEvent.setJobId(event.getJobId());
+ newEvent.setStatement(event.getStatement());
+ newEvent.setSourceDialectName(event.getSourceDialectName());
+ if (event.getChangeAfter() != null) {
+ CatalogTable newChangeAfter =
+ CatalogTable.of(
+ event.getChangeAfter().getTableId(),
+
convertTableSchema(event.getChangeAfter().getTableSchema()),
+ event.getChangeAfter().getOptions(),
+ event.getChangeAfter().getPartitionKeys(),
+ event.getChangeAfter().getComment());
+ newEvent.setChangeAfter(newChangeAfter);
+ }
+ return newEvent;
+ }
+
+ private Column convertName(Column column) {
+ return column.rename(convertName(column.getName()));
+ }
+
+ private TableSchema convertTableSchema(TableSchema tableSchema) {
+ List<Column> columns =
+ tableSchema.getColumns().stream()
+ .map(
+ column -> {
+ String newColumnName =
convertName(column.getName());
+ return column.rename(newColumnName);
+ })
+ .collect(Collectors.toList());
+ PrimaryKey primaryKey =
+ Optional.ofNullable(tableSchema.getPrimaryKey())
+ .map(
+ pk ->
+ PrimaryKey.of(
+ pk.getPrimaryKey(),
+ pk.getColumnNames().stream()
+ .map(this::convertName)
+
.collect(Collectors.toList()),
+ pk.getEnableAutoId()))
+ .orElse(null);
+ List<ConstraintKey> constraintKeys =
+ Optional.ofNullable(tableSchema.getConstraintKeys())
+ .map(
+ keyList ->
+ keyList.stream()
+ .map(
+ key ->
+
ConstraintKey.of(
+
key.getConstraintType(),
+
key.getConstraintName(),
+
key.getColumnNames()
+
.stream()
+
.map(
+
column ->
+
ConstraintKey
+
.ConstraintKeyColumn
+
.of(
+
convertName(
+
column
+
.getColumnName()),
+
column
+
.getSortType()))
+
.collect(
+
Collectors
+
.toList())))
+ .collect(Collectors.toList()))
+ .orElse(null);
+ return TableSchema.builder()
+ .columns(columns)
+ .primaryKey(primaryKey)
+ .constraintKey(constraintKeys)
+ .build();
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java
new file mode 100644
index 0000000000..299bbb5255
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.transform.rename.FieldRenameConfig.CONVERT_CASE;
+import static org.apache.seatunnel.transform.rename.FieldRenameConfig.PREFIX;
+import static
org.apache.seatunnel.transform.rename.FieldRenameConfig.REPLACEMENTS_WITH_REGEX;
+import static org.apache.seatunnel.transform.rename.FieldRenameConfig.SPECIFIC;
+import static org.apache.seatunnel.transform.rename.FieldRenameConfig.SUFFIX;
+
+@AutoService(Factory.class)
+public class FieldRenameTransformFactory implements TableTransformFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return FieldRenameTransform.PLUGIN_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .optional(CONVERT_CASE, PREFIX, SUFFIX,
REPLACEMENTS_WITH_REGEX, SPECIFIC)
+ .optional(TransformCommonOptions.MULTI_TABLES)
+ .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
+ .build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ return () ->
+ new FieldRenameMultiCatalogTransform(
+ context.getCatalogTables(), context.getOptions());
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java
new file mode 100644
index 0000000000..be2d6a25e2
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+public class TableRenameConfig implements Serializable {
+
+ public static final Option<ConvertCase> CONVERT_CASE =
+ Options.key("convert_case")
+ .enumType(ConvertCase.class)
+ .noDefaultValue()
+ .withDescription("Convert to uppercase or lowercase");
+
+ public static final Option<String> PREFIX =
+ Options.key("prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Add prefix for table name");
+
+ public static final Option<String> SUFFIX =
+ Options.key("suffix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Add suffix for table name");
+
+ public static final Option<List<ReplacementsWithRegex>>
REPLACEMENTS_WITH_REGEX =
+ Options.key("replacements_with_regex")
+ .listType(ReplacementsWithRegex.class)
+ .noDefaultValue()
+ .withDescription("The regex of replace table name to ");
+
+ @JsonAlias("convert_case")
+ private ConvertCase convertCase;
+
+ @JsonAlias("prefix")
+ private String prefix;
+
+ @JsonAlias("suffix")
+ private String suffix;
+
+ @JsonAlias("replacements_with_regex")
+ private List<ReplacementsWithRegex> replacementsWithRegex;
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class ReplacementsWithRegex implements Serializable {
+ @JsonAlias("replace_from")
+ private String replaceFrom;
+
+ @JsonAlias("replace_to")
+ private String replaceTo;
+
+ private final Boolean isRegex = true;
+ }
+
+ public static TableRenameConfig of(ReadonlyConfig config) {
+ TableRenameConfig renameConfig = new TableRenameConfig();
+ renameConfig.setConvertCase(config.get(CONVERT_CASE));
+ renameConfig.setPrefix(config.get(PREFIX));
+ renameConfig.setSuffix(config.get(SUFFIX));
+
renameConfig.setReplacementsWithRegex(config.get(REPLACEMENTS_WITH_REGEX));
+ return renameConfig;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
new file mode 100644
index 0000000000..67cff881da
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+
+import java.util.List;
+
+public class TableRenameMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
+
+ public TableRenameMultiCatalogTransform(
+ List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+ super(inputCatalogTables, config);
+ }
+
+ @Override
+ public String getPluginName() {
+ return TableRenameTransform.PLUGIN_NAME;
+ }
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
+ CatalogTable table, ReadonlyConfig config) {
+ return new TableRenameTransform(TableRenameConfig.of(config), table);
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java
new file mode 100644
index 0000000000..d1a3156115
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TableRenameTransform extends AbstractCatalogSupportMapTransform {
+ public static String PLUGIN_NAME = "TableRename";
+
+ private final CatalogTable inputTable;
+ private final TableRenameConfig config;
+
+ private TablePath outputTablePath;
+ private String outputTableId;
+
+ public TableRenameTransform(TableRenameConfig config, CatalogTable table) {
+ super(table);
+ this.inputTable = table;
+ this.config = config;
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return TableSchema.builder()
+ .columns(inputTable.getTableSchema().getColumns())
+ .constraintKey(inputTable.getTableSchema().getConstraintKeys())
+ .primaryKey(inputTable.getTableSchema().getPrimaryKey())
+ .build();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ TablePath inputTablePath = inputTable.getTablePath();
+ String inputDatabaseName = inputTablePath.getDatabaseName();
+ String inputSchemaName = inputTablePath.getSchemaName();
+ String inputTableName = inputTablePath.getTableName();
+
+ String outputDatabaseName =
+
Optional.ofNullable(inputDatabaseName).map(this::convertCase).orElse(null);
+ String outputSchemaName =
+
Optional.ofNullable(inputSchemaName).map(this::convertCase).orElse(null);
+ String outputTableName = convertName(inputTableName);
+ TablePath outputTablePath =
+ TablePath.of(outputDatabaseName, outputSchemaName,
outputTableName);
+ this.outputTablePath = outputTablePath;
+ this.outputTableId = outputTablePath.getFullName();
+ return TableIdentifier.of(inputTable.getCatalogName(),
outputTablePath);
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ if (inputRow.getTableId() == null) {
+ log.debug("Table id is null, skip renaming");
+ return inputRow;
+ }
+ if (outputTableId.equals(inputRow.getTableId())) {
+ return inputRow;
+ }
+
+ SeaTunnelRow outputRow = inputRow.copy();
+ outputRow.setTableId(outputTableId);
+ return outputRow;
+ }
+
+ @Override
+ public SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent event) {
+ TablePath inputTablePath = event.tablePath();
+ if (inputTablePath == null) {
+ return event;
+ }
+ if (outputTablePath.equals(inputTablePath)) {
+ return event;
+ }
+
+ if (event instanceof AlterTableColumnsEvent) {
+ TableIdentifier newTableIdentifier =
+
TableIdentifier.of(event.tableIdentifier().getCatalogName(), outputTablePath);
+ AlterTableColumnsEvent alterTableColumnsEvent =
(AlterTableColumnsEvent) event;
+ AlterTableColumnsEvent newEvent =
+ new AlterTableColumnsEvent(
+ newTableIdentifier,
+ alterTableColumnsEvent.getEvents().stream()
+ .map(this::convertName)
+ .collect(Collectors.toList()));
+
+ newEvent.setJobId(event.getJobId());
+ newEvent.setStatement(((AlterTableColumnsEvent)
event).getStatement());
+ newEvent.setSourceDialectName(((AlterTableColumnsEvent)
event).getSourceDialectName());
+ if (event.getChangeAfter() != null) {
+ newEvent.setChangeAfter(
+ CatalogTable.of(newTableIdentifier,
event.getChangeAfter()));
+ }
+ return newEvent;
+ }
+ if (event instanceof AlterTableColumnEvent) {
+ return convertName((AlterTableColumnEvent) event);
+ }
+ return event;
+ }
+
+ public String convertCase(String name) {
+ if (config.getConvertCase() != null) {
+ switch (config.getConvertCase()) {
+ case UPPER:
+ return name.toUpperCase();
+ case LOWER:
+ return name.toLowerCase();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported convert case: " +
config.getConvertCase());
+ }
+ }
+ return name;
+ }
+
+ @VisibleForTesting
+ public String convertName(String tableName) {
+ String replaceTo = null;
+ Map<Integer, Integer> replaceIndex = new LinkedHashMap<>();
+
+ if (CollectionUtils.isNotEmpty(config.getReplacementsWithRegex())) {
+ for (TableRenameConfig.ReplacementsWithRegex replacementsWithRegex
:
+ config.getReplacementsWithRegex()) {
+ Boolean isRegex = replacementsWithRegex.getIsRegex();
+ String replacement = replacementsWithRegex.getReplaceFrom();
+ if (StringUtils.isNotEmpty(replacement)) {
+ Map<Integer, Integer> matched = new LinkedHashMap<>();
+ if (BooleanUtils.isNotTrue(isRegex)) {
+ if (StringUtils.equals(replacement, tableName)) {
+ matched.put(0, tableName.length());
+ }
+ } else {
+ Matcher matcher =
Pattern.compile(replacement).matcher(tableName);
+ while (matcher.find()) {
+ matched.put(matcher.start(), matcher.end());
+ }
+ }
+ if (!matched.isEmpty()) {
+ replaceTo = replacementsWithRegex.getReplaceTo();
+ replaceIndex = matched;
+ }
+ }
+ }
+ }
+
+ tableName = convertCase(tableName);
+
+ int offset = 0;
+ for (Map.Entry<Integer, Integer> index : replaceIndex.entrySet()) {
+ int indexStart = index.getKey();
+ int indexEnd = index.getValue();
+ tableName =
+ tableName.substring(0, indexStart + offset)
+ + replaceTo.trim()
+ + tableName.substring(indexEnd + offset);
+ offset += replaceTo.trim().length() - (indexEnd - indexStart);
+ }
+ if (StringUtils.isNotBlank(config.getPrefix())) {
+ tableName = config.getPrefix().trim() + tableName;
+ }
+ if (StringUtils.isNotBlank(config.getSuffix())) {
+ tableName = tableName + config.getSuffix().trim();
+ }
+ return tableName;
+ }
+
+ @VisibleForTesting
+ public AlterTableColumnEvent convertName(AlterTableColumnEvent event) {
+ TableIdentifier newTableIdentifier =
+ TableIdentifier.of(event.tableIdentifier().getCatalogName(),
outputTablePath);
+ AlterTableColumnEvent newEvent = event;
+ switch (event.getEventType()) {
+ case SCHEMA_CHANGE_ADD_COLUMN:
+ AlterTableAddColumnEvent addColumnEvent =
(AlterTableAddColumnEvent) event;
+ newEvent =
+ new AlterTableAddColumnEvent(
+ newTableIdentifier,
+ addColumnEvent.getColumn(),
+ addColumnEvent.isFirst(),
+ addColumnEvent.getAfterColumn());
+ break;
+ case SCHEMA_CHANGE_DROP_COLUMN:
+ AlterTableDropColumnEvent dropColumnEvent =
(AlterTableDropColumnEvent) event;
+ newEvent =
+ new AlterTableDropColumnEvent(
+ newTableIdentifier,
dropColumnEvent.getColumn());
+ break;
+ case SCHEMA_CHANGE_MODIFY_COLUMN:
+ AlterTableModifyColumnEvent modifyColumnEvent =
(AlterTableModifyColumnEvent) event;
+ newEvent =
+ new AlterTableModifyColumnEvent(
+ newTableIdentifier,
+ modifyColumnEvent.getColumn(),
+ modifyColumnEvent.isFirst(),
+ modifyColumnEvent.getAfterColumn());
+ break;
+ case SCHEMA_CHANGE_CHANGE_COLUMN:
+ AlterTableChangeColumnEvent changeColumnEvent =
(AlterTableChangeColumnEvent) event;
+ newEvent =
+ new AlterTableChangeColumnEvent(
+ newTableIdentifier,
+ changeColumnEvent.getOldColumn(),
+ changeColumnEvent.getColumn(),
+ changeColumnEvent.isFirst(),
+ changeColumnEvent.getAfterColumn());
+ break;
+ default:
+ log.warn("Unsupported event: {}", event);
+ return event;
+ }
+
+ newEvent.setJobId(event.getJobId());
+ newEvent.setStatement(event.getStatement());
+ newEvent.setSourceDialectName(event.getSourceDialectName());
+ if (event.getChangeAfter() != null) {
+ newEvent.setChangeAfter(CatalogTable.of(newTableIdentifier,
event.getChangeAfter()));
+ }
+ return newEvent;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java
new file mode 100644
index 0000000000..840bfff209
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.transform.rename.TableRenameConfig.CONVERT_CASE;
+import static org.apache.seatunnel.transform.rename.TableRenameConfig.PREFIX;
+import static
org.apache.seatunnel.transform.rename.TableRenameConfig.REPLACEMENTS_WITH_REGEX;
+import static org.apache.seatunnel.transform.rename.TableRenameConfig.SUFFIX;
+
+@AutoService(Factory.class)
+public class TableRenameTransformFactory implements TableTransformFactory {
+ @Override
+ public String factoryIdentifier() {
+ return TableRenameTransform.PLUGIN_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .optional(CONVERT_CASE, PREFIX, SUFFIX,
REPLACEMENTS_WITH_REGEX)
+ .optional(TransformCommonOptions.MULTI_TABLES)
+ .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
+ .build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ return () ->
+ new TableRenameMultiCatalogTransform(
+ context.getCatalogTables(), context.getOptions());
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java
new file mode 100644
index 0000000000..02a8b01bcf
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+public class FieldRenameTransformTest {
+
+ private static final CatalogTable DEFAULT_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("test", "Database-x", "Schema-x",
"Table-x"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "f1",
+ BasicType.LONG_TYPE,
+ null,
+ null,
+ false,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f2",
+ BasicType.LONG_TYPE,
+ null,
+ null,
+ true,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f3",
+ BasicType.LONG_TYPE,
+ null,
+ null,
+ true,
+ null,
+ null))
+ .primaryKey(PrimaryKey.of("pk1",
Arrays.asList("f1")))
+ .constraintKey(
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType.UNIQUE_KEY,
+ "uk1",
+ Arrays.asList(
+
ConstraintKey.ConstraintKeyColumn.of(
+ "f2",
ConstraintKey.ColumnSortType.ASC),
+
ConstraintKey.ConstraintKeyColumn.of(
+ "f3",
+
ConstraintKey.ColumnSortType.ASC))))
+ .build(),
+ Collections.emptyMap(),
+ Collections.singletonList("f2"),
+ null);
+
+ @Test
+ public void testRename() {
+ AlterTableAddColumnEvent addColumnEvent =
+ AlterTableAddColumnEvent.add(
+ DEFAULT_TABLE.getTableId(),
+ PhysicalColumn.of("f4", BasicType.LONG_TYPE, null,
null, true, null, null));
+ AlterTableModifyColumnEvent modifyColumnEvent =
+ AlterTableModifyColumnEvent.modify(
+ DEFAULT_TABLE.getTableId(),
+ PhysicalColumn.of("f4", BasicType.INT_TYPE, null,
null, true, null, null));
+ AlterTableChangeColumnEvent changeColumnEvent =
+ AlterTableChangeColumnEvent.change(
+ DEFAULT_TABLE.getTableId(),
+ "f4",
+ PhysicalColumn.of("f5", BasicType.INT_TYPE, null,
null, true, null, null));
+ AlterTableDropColumnEvent dropColumnEvent =
+ new AlterTableDropColumnEvent(DEFAULT_TABLE.getTableId(),
"f5");
+
+ FieldRenameConfig config = new
FieldRenameConfig().setConvertCase(ConvertCase.LOWER);
+ FieldRenameTransform transform = new FieldRenameTransform(config,
DEFAULT_TABLE);
+ CatalogTable outputCatalogTable = transform.getProducedCatalogTable();
+ AlterTableAddColumnEvent outputAddEvent =
+ (AlterTableAddColumnEvent)
transform.mapSchemaChangeEvent(addColumnEvent);
+ AlterTableModifyColumnEvent outputModifyEvent =
+ (AlterTableModifyColumnEvent)
transform.mapSchemaChangeEvent(modifyColumnEvent);
+ AlterTableChangeColumnEvent outputChangeEvent =
+ (AlterTableChangeColumnEvent)
transform.mapSchemaChangeEvent(changeColumnEvent);
+ AlterTableDropColumnEvent outputDropEvent =
+ (AlterTableDropColumnEvent)
transform.mapSchemaChangeEvent(dropColumnEvent);
+
+ Assertions.assertIterableEquals(
+ Arrays.asList("f1", "f2", "f3"),
+
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+ Assertions.assertIterableEquals(
+ Arrays.asList("f1"),
+
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+ outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .forEach(
+ key ->
+ Assertions.assertIterableEquals(
+ Arrays.asList("f2", "f3"),
+ key.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.toList())));
+ Assertions.assertEquals("f4", outputAddEvent.getColumn().getName());
+ Assertions.assertEquals("f4", outputModifyEvent.getColumn().getName());
+ Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+ Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+ Assertions.assertEquals("f5", outputDropEvent.getColumn());
+
+ config = new FieldRenameConfig().setConvertCase(ConvertCase.UPPER);
+ transform = new FieldRenameTransform(config, DEFAULT_TABLE);
+ outputCatalogTable = transform.getProducedCatalogTable();
+ outputAddEvent = (AlterTableAddColumnEvent)
transform.mapSchemaChangeEvent(addColumnEvent);
+ outputModifyEvent =
+ (AlterTableModifyColumnEvent)
transform.mapSchemaChangeEvent(modifyColumnEvent);
+ outputChangeEvent =
+ (AlterTableChangeColumnEvent)
transform.mapSchemaChangeEvent(changeColumnEvent);
+ outputDropEvent =
+ (AlterTableDropColumnEvent)
transform.mapSchemaChangeEvent(dropColumnEvent);
+ Assertions.assertIterableEquals(
+ Arrays.asList("F1", "F2", "F3"),
+
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+ Assertions.assertIterableEquals(
+ Arrays.asList("F1"),
+
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+ outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .forEach(
+ key ->
+ Assertions.assertIterableEquals(
+ Arrays.asList("F2", "F3"),
+ key.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.toList())));
+ Assertions.assertEquals("F4", outputAddEvent.getColumn().getName());
+ Assertions.assertEquals("F4", outputModifyEvent.getColumn().getName());
+ Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+ Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+ Assertions.assertEquals("F5", outputDropEvent.getColumn());
+
+ config = new FieldRenameConfig().setPrefix("p-").setSuffix("-s");
+ transform = new FieldRenameTransform(config, DEFAULT_TABLE);
+ outputCatalogTable = transform.getProducedCatalogTable();
+ outputAddEvent = (AlterTableAddColumnEvent)
transform.mapSchemaChangeEvent(addColumnEvent);
+ outputModifyEvent =
+ (AlterTableModifyColumnEvent)
transform.mapSchemaChangeEvent(modifyColumnEvent);
+ outputChangeEvent =
+ (AlterTableChangeColumnEvent)
transform.mapSchemaChangeEvent(changeColumnEvent);
+ outputDropEvent =
+ (AlterTableDropColumnEvent)
transform.mapSchemaChangeEvent(dropColumnEvent);
+ Assertions.assertIterableEquals(
+ Arrays.asList("p-f1-s", "p-f2-s", "p-f3-s"),
+
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+ Assertions.assertIterableEquals(
+ Arrays.asList("p-f1-s"),
+
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+ outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .forEach(
+ key ->
+ Assertions.assertIterableEquals(
+ Arrays.asList("p-f2-s", "p-f3-s"),
+ key.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.toList())));
+ Assertions.assertEquals("p-f4-s",
outputAddEvent.getColumn().getName());
+ Assertions.assertEquals("p-f4-s",
outputModifyEvent.getColumn().getName());
+ Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+ Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+ Assertions.assertEquals("p-f5-s", outputDropEvent.getColumn());
+
+ config =
+ new FieldRenameConfig()
+ .setReplacementsWithRegex(
+ Arrays.asList(
+ new
FieldRenameConfig.ReplacementsWithRegex(
+ "f1", "t1", true),
+ new
FieldRenameConfig.ReplacementsWithRegex(
+ "f1", "t2", true)));
+ transform = new FieldRenameTransform(config, DEFAULT_TABLE);
+ outputCatalogTable = transform.getProducedCatalogTable();
+ outputAddEvent = (AlterTableAddColumnEvent)
transform.mapSchemaChangeEvent(addColumnEvent);
+ outputModifyEvent =
+ (AlterTableModifyColumnEvent)
transform.mapSchemaChangeEvent(modifyColumnEvent);
+ outputChangeEvent =
+ (AlterTableChangeColumnEvent)
transform.mapSchemaChangeEvent(changeColumnEvent);
+ outputDropEvent =
+ (AlterTableDropColumnEvent)
transform.mapSchemaChangeEvent(dropColumnEvent);
+ Assertions.assertIterableEquals(
+ Arrays.asList("t2", "f2", "f3"),
+
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+ Assertions.assertIterableEquals(
+ Arrays.asList("t2"),
+
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+ outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .forEach(
+ key ->
+ Assertions.assertIterableEquals(
+ Arrays.asList("f2", "f3"),
+ key.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.toList())));
+ Assertions.assertEquals("f4", outputAddEvent.getColumn().getName());
+ Assertions.assertEquals("f4", outputModifyEvent.getColumn().getName());
+ Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+ Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+ Assertions.assertEquals("f5", outputDropEvent.getColumn());
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java
new file mode 100644
index 0000000000..d1d756922e
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class TableRenameTransformTest {
+
+ private static final CatalogTable DEFAULT_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("test", "Database-x", "Schema-x",
"Table-x"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "f1",
+ BasicType.LONG_TYPE,
+ null,
+ null,
+ false,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f2",
+ BasicType.LONG_TYPE,
+ null,
+ null,
+ true,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f3",
+ BasicType.LONG_TYPE,
+ null,
+ null,
+ true,
+ null,
+ null))
+ .primaryKey(PrimaryKey.of("pk1",
Arrays.asList("f1")))
+ .constraintKey(
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType.UNIQUE_KEY,
+ "uk1",
+ Arrays.asList(
+
ConstraintKey.ConstraintKeyColumn.of(
+ "f2",
ConstraintKey.ColumnSortType.ASC),
+
ConstraintKey.ConstraintKeyColumn.of(
+ "f3",
+
ConstraintKey.ColumnSortType.ASC))))
+ .build(),
+ Collections.emptyMap(),
+ Collections.singletonList("f2"),
+ null);
+
+ @Test
+ public void testRename() {
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {1L, 1L, 1L});
+ inputRow.setTableId(DEFAULT_TABLE.getTablePath().getFullName());
+ AlterTableAddColumnEvent inputEvent =
+ AlterTableAddColumnEvent.add(
+ DEFAULT_TABLE.getTableId(),
+ PhysicalColumn.of("f4", BasicType.LONG_TYPE, null,
null, true, null, null));
+
+ TableRenameConfig config = new
TableRenameConfig().setConvertCase(ConvertCase.LOWER);
+
+ TableRenameTransform transform = new TableRenameTransform(config,
DEFAULT_TABLE);
+ List<CatalogTable> outputCatalogTable =
transform.getProducedCatalogTables();
+ SeaTunnelRow outputRow = transform.map(inputRow);
+ SchemaChangeEvent outputEvent =
transform.mapSchemaChangeEvent(inputEvent);
+ Assertions.assertEquals(
+ "database-x.schema-x.table-x",
+
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+ Assertions.assertEquals("database-x.schema-x.table-x",
outputRow.getTableId());
+ Assertions.assertEquals(
+ "database-x.schema-x.table-x",
outputEvent.tablePath().getFullName());
+
+ config = new TableRenameConfig().setConvertCase(ConvertCase.UPPER);
+ transform = new TableRenameTransform(config, DEFAULT_TABLE);
+ outputCatalogTable = transform.getProducedCatalogTables();
+ outputRow = transform.map(inputRow);
+ outputEvent = transform.mapSchemaChangeEvent(inputEvent);
+ Assertions.assertEquals(
+ "DATABASE-X.SCHEMA-X.TABLE-X",
+
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+ Assertions.assertEquals("DATABASE-X.SCHEMA-X.TABLE-X",
outputRow.getTableId());
+ Assertions.assertEquals(
+ "DATABASE-X.SCHEMA-X.TABLE-X",
outputEvent.tablePath().getFullName());
+
+ config = new
TableRenameConfig().setPrefix("user-").setSuffix("-table");
+ transform = new TableRenameTransform(config, DEFAULT_TABLE);
+ outputCatalogTable = transform.getProducedCatalogTables();
+ outputRow = transform.map(inputRow);
+ outputEvent = transform.mapSchemaChangeEvent(inputEvent);
+ Assertions.assertEquals(
+ "Database-x.Schema-x.user-Table-x-table",
+
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+ Assertions.assertEquals("Database-x.Schema-x.user-Table-x-table",
outputRow.getTableId());
+ Assertions.assertEquals(
+ "Database-x.Schema-x.user-Table-x-table",
outputEvent.tablePath().getFullName());
+
+ config =
+ new TableRenameConfig()
+ .setReplacementsWithRegex(
+ Arrays.asList(
+ new
TableRenameConfig.ReplacementsWithRegex("Table", "t1"),
+ new
TableRenameConfig.ReplacementsWithRegex(
+ "Table", "t2")));
+ transform = new TableRenameTransform(config, DEFAULT_TABLE);
+ outputCatalogTable = transform.getProducedCatalogTables();
+ outputRow = transform.map(inputRow);
+ outputEvent = transform.mapSchemaChangeEvent(inputEvent);
+ Assertions.assertEquals(
+ "Database-x.Schema-x.t2-x",
+
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+ Assertions.assertEquals("Database-x.Schema-x.t2-x",
outputRow.getTableId());
+ Assertions.assertEquals("Database-x.Schema-x.t2-x",
outputEvent.tablePath().getFullName());
+ }
+}