This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e4e12245b9 [Feature][transforms-v2] Support append only stream from 
cdc source (#7763)
e4e12245b9 is described below

commit e4e12245b90e994bce27ea90ac044e722888a8ae
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Oct 14 19:25:44 2024 +0800

    [Feature][transforms-v2] Support append only stream from cdc source (#7763)
---
 docs/en/transform-v2/rowkind-extractor.md          | 113 +++++++++++++++
 docs/zh/transform-v2/rowkind-extractor.md          | 112 +++++++++++++++
 plugin-mapping.properties                          |   5 +-
 .../transform/TestRowKindExtractorTransformIT.java |  40 ++++++
 .../rowkind_extractor_transform_case1.conf         | 119 ++++++++++++++++
 .../rowkind_extractor_transform_case2.conf         | 118 ++++++++++++++++
 .../common/SingleFieldOutputTransform.java         |   8 ++
 .../rowkind/RowKindExtractorTransform.java         |  97 +++++++++++++
 .../rowkind/RowKindExtractorTransformConfig.java   |  45 ++++++
 .../rowkind/RowKindExtractorTransformFactory.java  |  48 +++++++
 .../rowkind/RowKindExtractorTransformType.java     |  23 ++++
 .../RowKindExtractorTransformFactoryTest.java      |  33 +++++
 .../rowkind/RowKindExtractorTransformTest.java     | 153 +++++++++++++++++++++
 13 files changed, 912 insertions(+), 2 deletions(-)

diff --git a/docs/en/transform-v2/rowkind-extractor.md 
b/docs/en/transform-v2/rowkind-extractor.md
new file mode 100644
index 0000000000..fdd1f411a8
--- /dev/null
+++ b/docs/en/transform-v2/rowkind-extractor.md
@@ -0,0 +1,113 @@
+# RowKindExtractor
+
+> RowKindExtractor transform plugin
+
+## Description
+
+transform cdc row to append only row that contains the cdc RowKind. <br />
+Example: <br />
+CDC row: -D 1, test1, test2 <br />
+transformed Row: +I 1,test1,test2,DELETE
+
+## Options
+
+| name              | type   | required | default value |
+|-------------------|--------|----------|---------------|
+| custom_field_name | string | yes      | row_kind      |
+| transform_type    | enum   | yes      | SHORT         |
+
+### custom_field_name [string]
+
+Custom field name of the RowKind field 
+
+### transform_type [enum]
+
+the RowKind field value formatting , the option can be `SHORT` or `FULL`
+
+`SHORT` : +I, -U , +U, -D
+`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+
+## Examples
+
+
+```yaml
+
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        schema = {
+            fields {
+                pk_id = bigint
+                name = string
+                score = int
+            }
+            primaryKey {
+                name = "pk_id"
+                columnNames = [pk_id]
+            }
+        }
+        rows = [
+            {
+                kind = INSERT
+                fields = [1, "A", 100]
+            },
+            {
+                kind = INSERT
+                fields = [2, "B", 100]
+            },
+            {
+                kind = INSERT
+                fields = [3, "C", 100]
+            },
+            {
+                kind = INSERT
+                fields = [4, "D", 100]
+            },
+            {
+                kind = UPDATE_BEFORE
+                fields = [1, "A", 100]
+            },
+            {
+                kind = UPDATE_AFTER
+                fields = [1, "F", 100]
+            }
+            {
+                kind = UPDATE_BEFORE
+                fields = [2, "B", 100]
+            },
+            {
+                kind = UPDATE_AFTER
+                fields = [2, "G", 100]
+            },
+            {
+                kind = DELETE
+                fields = [3, "C", 100]
+            },
+            {
+                kind = DELETE
+                fields = [4, "D", 100]
+            }
+        ]
+    }
+}
+
+transform {
+  RowKindExtractor {
+        custom_field_name = "custom_name"
+        transform_type = FULL
+        result_table_name = "trans_result"
+    }
+}
+
+sink {
+  Console {
+    source_table_name = "custom_name"
+  }
+}
+
+```
+
diff --git a/docs/zh/transform-v2/rowkind-extractor.md 
b/docs/zh/transform-v2/rowkind-extractor.md
new file mode 100644
index 0000000000..2349450005
--- /dev/null
+++ b/docs/zh/transform-v2/rowkind-extractor.md
@@ -0,0 +1,112 @@
+# RowKindExtractor
+
+> RowKindExtractor transform plugin
+
+## Description
+
+将CDC Row 转换为 Append only Row, 转换后的行扩展了RowKind字段 <br />
+Example: <br />
+CDC row: -D 1, test1, test2 <br />
+transformed Row: +I 1,test1,test2,DELETE
+
+## Options
+
+| name              | type   | required | default value |
+|-------------------|--------|----------|---------------|
+| custom_field_name | string | yes      | row_kind      |
+| transform_type    | enum   | yes      | SHORT         |
+
+### custom_field_name [string]
+
+RowKind列的自定义名
+
+### transform_type [enum]
+
+格式化RowKind值 , 配置为 `SHORT` 或 `FULL`
+
+`SHORT` : +I, -U , +U, -D
+`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+
+## Examples
+
+```yaml
+
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        schema = {
+            fields {
+                pk_id = bigint
+                name = string
+                score = int
+            }
+            primaryKey {
+                name = "pk_id"
+                columnNames = [pk_id]
+            }
+        }
+        rows = [
+            {
+                kind = INSERT
+                fields = [1, "A", 100]
+            },
+            {
+                kind = INSERT
+                fields = [2, "B", 100]
+            },
+            {
+                kind = INSERT
+                fields = [3, "C", 100]
+            },
+            {
+                kind = INSERT
+                fields = [4, "D", 100]
+            },
+            {
+                kind = UPDATE_BEFORE
+                fields = [1, "A", 100]
+            },
+            {
+                kind = UPDATE_AFTER
+                fields = [1, "F", 100]
+            }
+            {
+                kind = UPDATE_BEFORE
+                fields = [2, "B", 100]
+            },
+            {
+                kind = UPDATE_AFTER
+                fields = [2, "G", 100]
+            },
+            {
+                kind = DELETE
+                fields = [3, "C", 100]
+            },
+            {
+                kind = DELETE
+                fields = [4, "D", 100]
+            }
+        ]
+    }
+}
+
+transform {
+  RowKindExtractor {
+        custom_field_name = "custom_name"
+        transform_type = FULL
+        result_table_name = "trans_result"
+    }
+}
+
+sink {
+  Console {
+    source_table_name = "custom_name"
+  }
+}
+
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 86c95bc3e2..e314ef8661 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -135,9 +135,11 @@ seatunnel.sink.ActiveMQ = connector-activemq
 seatunnel.source.Qdrant = connector-qdrant
 seatunnel.sink.Qdrant = connector-qdrant
 seatunnel.source.Sls = connector-sls
+seatunnel.sink.Sls = connector-sls
 seatunnel.source.Typesense = connector-typesense
 seatunnel.sink.Typesense = connector-typesense
 seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
+
 seatunnel.transform.Sql = seatunnel-transforms-v2
 seatunnel.transform.FieldMapper = seatunnel-transforms-v2
 seatunnel.transform.Filter = seatunnel-transforms-v2
@@ -149,5 +151,4 @@ seatunnel.transform.Copy = seatunnel-transforms-v2
 seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
 seatunnel.transform.LLM = seatunnel-transforms-v2
 seatunnel.transform.Embedding = seatunnel-transforms-v2
-seatunnel.sink.Sls = connector-sls
-
+seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java
new file mode 100644
index 0000000000..7f4c3fd674
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestRowKindExtractorTransformIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testRowKindExtractorTransform(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult1 =
+                
container.executeJob("/rowkind_extractor_transform_case1.conf");
+        Assertions.assertEquals(0, execResult1.getExitCode());
+        Container.ExecResult execResult2 =
+                
container.executeJob("/rowkind_extractor_transform_case2.conf");
+        Assertions.assertEquals(0, execResult2.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case1.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case1.conf
new file mode 100644
index 0000000000..fd9efaafa6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case1.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [4, "D", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "F", 100]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [2, "B", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [2, "G", 100]
+      },
+      {
+        kind = DELETE
+        fields = [3, "C", 100]
+      },
+      {
+        kind = DELETE
+        fields = [4, "D", 100]
+      }
+    ]
+  }
+}
+
+transform {
+  RowKindExtractor {
+    custom_field_name = "custom_name"
+    transform_type = FULL
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 10
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 10
+        }
+      ]
+      field_rules = [
+        {
+          field_name = custom_name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case2.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case2.conf
new file mode 100644
index 0000000000..fc7c342c5e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case2.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [4, "D", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "F", 100]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [2, "B", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [2, "G", 100]
+      },
+      {
+        kind = DELETE
+        fields = [3, "C", 100]
+      },
+      {
+        kind = DELETE
+        fields = [4, "D", 100]
+      }
+    ]
+  }
+}
+
+transform {
+  RowKindExtractor {
+    transform_type = SHORT
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 10
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 10
+        }
+      ]
+      field_rules = [
+        {
+          field_name = row_kind
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index c0ba3d1ca9..13d25989ac 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -137,4 +137,12 @@ public abstract class SingleFieldOutputTransform extends 
AbstractCatalogSupportT
     }
 
     protected abstract Column getOutputColumn();
+
+    public int getFieldIndex() {
+        return fieldIndex;
+    }
+
+    public SeaTunnelRowContainerGenerator getRowContainerGenerator() {
+        return rowContainerGenerator;
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
new file mode 100644
index 0000000000..ae7b1fcaa1
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.NonNull;
+
+import java.util.Arrays;
+
+public class RowKindExtractorTransform extends SingleFieldOutputTransform {
+
+    private final ReadonlyConfig config;
+
+    private final RowKindExtractorTransformType transformType;
+
+    public RowKindExtractorTransform(
+            @NonNull ReadonlyConfig config, @NonNull CatalogTable 
inputCatalogTable) {
+        super(inputCatalogTable);
+        this.config = config;
+        this.transformType = 
config.get(RowKindExtractorTransformConfig.TRANSFORM_TYPE);
+    }
+
+    @Override
+    public String getPluginName() {
+        return RowKindExtractorTransformConfig.PLUGIN_NAME;
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        Object fieldValue = getOutputFieldValue(new 
SeaTunnelRowAccessor(inputRow));
+        inputRow.setRowKind(RowKind.INSERT);
+        SeaTunnelRow outputRow = getRowContainerGenerator().apply(inputRow);
+        outputRow.setField(getFieldIndex(), fieldValue);
+        return outputRow;
+    }
+
+    @Override
+    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+        switch (transformType) {
+            case SHORT:
+                return inputRow.getRowKind().shortString();
+            case FULL:
+                return inputRow.getRowKind().name();
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unsupported transform type %s", 
transformType));
+        }
+    }
+
+    @Override
+    protected Column getOutputColumn() {
+        String customFieldName = 
config.get(RowKindExtractorTransformConfig.CUSTOM_FIELD_NAME);
+        String[] fieldNames = 
inputCatalogTable.getTableSchema().getFieldNames();
+        boolean isExist = Arrays.asList(fieldNames).contains(customFieldName);
+        if (isExist) {
+            throw new IllegalArgumentException(
+                    String.format("field name %s already exists", 
customFieldName));
+        }
+        return PhysicalColumn.of(
+                customFieldName,
+                BasicType.STRING_TYPE,
+                13L,
+                false,
+                RowKind.INSERT.shortString(),
+                "Output column of RowKind");
+    }
+
+    @VisibleForTesting
+    public void initRowContainerGenerator() {
+        transformTableSchema();
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformConfig.java
new file mode 100644
index 0000000000..017e6292a9
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+
+@Getter
+@Setter
+public class RowKindExtractorTransformConfig implements Serializable {
+
+    public static final String PLUGIN_NAME = "RowKindExtractor";
+
+    public static final Option<String> CUSTOM_FIELD_NAME =
+            Options.key("custom_field_name")
+                    .stringType()
+                    .defaultValue("row_kind")
+                    .withDescription("Custom field name of the RowKind field");
+
+    public static final Option<RowKindExtractorTransformType> TRANSFORM_TYPE =
+            Options.key("transform_type")
+                    .enumType(RowKindExtractorTransformType.class)
+                    .defaultValue(RowKindExtractorTransformType.SHORT)
+                    .withDescription("transform RowKind field value format");
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformFactory.java
new file mode 100644
index 0000000000..7228336da7
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RowKindExtractorTransformFactory implements TableTransformFactory 
{
+    @Override
+    public String factoryIdentifier() {
+        return RowKindExtractorTransformConfig.PLUGIN_NAME;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(RowKindExtractorTransformConfig.CUSTOM_FIELD_NAME)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
+        return () -> new RowKindExtractorTransform(context.getOptions(), 
catalogTable);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformType.java
new file mode 100644
index 0000000000..f3eb39d1fe
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+public enum RowKindExtractorTransformType {
+    SHORT,
+    FULL
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RowKindExtractorTransformFactoryTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RowKindExtractorTransformFactoryTest.java
new file mode 100644
index 0000000000..fcc73cc810
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RowKindExtractorTransformFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.transform.rowkind.RowKindExtractorTransformFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RowKindExtractorTransformFactoryTest {
+
+    @Test
+    public void testOptionRule() throws Exception {
+        RowKindExtractorTransformFactory replaceTransformFactory =
+                new RowKindExtractorTransformFactory();
+        Assertions.assertNotNull(replaceTransformFactory.optionRule());
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformTest.java
new file mode 100644
index 0000000000..b958dab7d0
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+class RowKindExtractorTransformTest {
+
+    static CatalogTable catalogTable;
+
+    static Object[] values;
+
+    static SeaTunnelRow inputRow;
+
+    @BeforeAll
+    static void setUp() {
+        catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("catalog", TablePath.DEFAULT),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key1",
+                                                BasicType.STRING_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key2",
+                                                BasicType.INT_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key3",
+                                                BasicType.LONG_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key4",
+                                                BasicType.DOUBLE_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key5",
+                                                BasicType.FLOAT_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .build(),
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "comment");
+        values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916, 
3.14};
+        inputRow = new SeaTunnelRow(values);
+    }
+
+    @Test
+    void testCdcRowTransformShort() {
+        RowKindExtractorTransform rowKindExtractorTransform =
+                new RowKindExtractorTransform(
+                        ReadonlyConfig.fromMap(new HashMap<>()), catalogTable);
+        rowKindExtractorTransform.initRowContainerGenerator();
+        SeaTunnelRow insertRow = inputRow.copy();
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, +I]}",
+                rowKindExtractorTransform.transformRow(insertRow).toString());
+        SeaTunnelRow updateBeforeRow = inputRow.copy();
+        updateBeforeRow.setRowKind(RowKind.UPDATE_BEFORE);
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, -U]}",
+                
rowKindExtractorTransform.transformRow(updateBeforeRow).toString());
+        SeaTunnelRow updateAfterRow = inputRow.copy();
+        updateAfterRow.setRowKind(RowKind.UPDATE_AFTER);
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, +U]}",
+                
rowKindExtractorTransform.transformRow(updateAfterRow).toString());
+        SeaTunnelRow deleteRow = inputRow.copy();
+        deleteRow.setRowKind(RowKind.DELETE);
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, -D]}",
+                rowKindExtractorTransform.transformRow(deleteRow).toString());
+    }
+
+    @Test
+    void testCdcRowTransformFull() {
+        HashMap<String, Object> conf = new HashMap<>();
+        conf.put("transform_type", "FULL");
+        RowKindExtractorTransform rowKindExtractorTransform =
+                new RowKindExtractorTransform(ReadonlyConfig.fromMap(conf), 
catalogTable);
+        rowKindExtractorTransform.initRowContainerGenerator();
+        SeaTunnelRow insertRow = inputRow.copy();
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, INSERT]}",
+                rowKindExtractorTransform.transformRow(insertRow).toString());
+        SeaTunnelRow updateBeforeRow = inputRow.copy();
+        updateBeforeRow.setRowKind(RowKind.UPDATE_BEFORE);
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, UPDATE_BEFORE]}",
+                
rowKindExtractorTransform.transformRow(updateBeforeRow).toString());
+        SeaTunnelRow updateAfterRow = inputRow.copy();
+        updateAfterRow.setRowKind(RowKind.UPDATE_AFTER);
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, UPDATE_AFTER]}",
+                
rowKindExtractorTransform.transformRow(updateAfterRow).toString());
+        SeaTunnelRow deleteRow = inputRow.copy();
+        deleteRow.setRowKind(RowKind.DELETE);
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 
896657703886127105, 3.1415916, 3.14, DELETE]}",
+                rowKindExtractorTransform.transformRow(deleteRow).toString());
+    }
+}


Reply via email to