This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 699d16552a [Feature][Transform-v2] Add metadata transform (#7899)
699d16552a is described below

commit 699d16552a4d6ca900503fe98a1bc184718574f2
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Nov 11 20:48:39 2024 +0800

    [Feature][Transform-v2] Add metadata transform (#7899)
---
 docs/en/transform-v2/dynamic-compile.md            |   4 +-
 docs/en/transform-v2/metadata.md                   |  85 +++++++++++
 docs/zh/transform-v2/dynamic-compile.md            |   4 +-
 docs/zh/transform-v2/metadata.md                   |  85 +++++++++++
 plugin-mapping.properties                          |   1 +
 .../seatunnel/api/table/type/CommonOptions.java    |  36 ++++-
 .../seatunnel/api/table/type/MetadataUtil.java     |  93 ++++++++++++
 .../seatunnel/api/table/type/SeaTunnelRow.java     |   4 +-
 .../api/table/type}/SeaTunnelRowAccessor.java      |  11 +-
 .../cdc/base/utils/SourceRecordUtils.java          |   7 +-
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java |  16 ++-
 .../MongoDBConnectorDeserializationSchema.java     |  17 ++-
 .../connector-cdc-mongodb-e2e/pom.xml              |  14 ++
 .../src/test/java/mongodb/MongodbCDCIT.java        |  45 +++++-
 .../test/resources/mongodbcdc_metadata_trans.conf  | 104 ++++++++++++++
 .../connector-cdc-mysql-e2e/pom.xml                |  14 ++
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java |  39 +++++
 .../test/resources/mysqlcdc_to_metadata_trans.conf | 103 +++++++++++++
 .../connector-cdc-opengauss-e2e/pom.xml            |  14 ++
 .../seatunnel/cdc/postgres/OpengaussCDCIT.java     |  42 ++++++
 .../resources/opengausscdc_to_meatadata_trans.conf | 105 ++++++++++++++
 .../connector-cdc-oracle-e2e/pom.xml               |  15 ++
 .../seatunnel/cdc/oracle/OracleCDCIT.java          |  42 ++++++
 .../resources/oraclecdc_to_metadata_trans.conf     | 119 +++++++++++++++
 .../connector-cdc-postgres-e2e/pom.xml             |  14 ++
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      |  43 ++++++
 .../resources/postgrescdc_to_metadata_trans.conf   | 105 ++++++++++++++
 .../connector-cdc-sqlserver-e2e/pom.xml            |  15 ++
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    |  39 +++++
 .../resources/sqlservercdc_to_metadata_trans.conf  | 110 ++++++++++++++
 .../e2e/common/container/TestContainer.java        |   4 +
 .../container/seatunnel/SeaTunnelContainer.java    |  24 +++-
 .../src/test/resources/seatunnel.yaml              |   4 +-
 ...ixed_dynamic_groovy_java_compile_transform.conf |   6 +-
 .../multiple_dynamic_groovy_compile_transform.conf |   6 +-
 .../multiple_dynamic_java_compile_transform.conf   |   6 +-
 .../single_dynamic_groovy_compile_transform.conf   |   4 +-
 .../single_dynamic_http_compile_transform.conf     |   4 +-
 .../single_dynamic_java_compile_transform.conf     |   4 +-
 .../dynamic_compile/source_file/GroovyFile         |   4 +-
 .../resources/dynamic_compile/source_file/JavaFile |   2 +-
 .../common/MultipleFieldOutputTransform.java       |   1 +
 .../transform/common/SeaTunnelRowAccessor.java     |   1 +
 .../common/SingleFieldOutputTransform.java         |   1 +
 .../transform/copy/CopyFieldTransform.java         |   2 +-
 .../dynamiccompile/DynamicCompileTransform.java    |   2 +-
 .../transform/exception/TransformCommonError.java  |  16 +++
 .../exception/TransformCommonErrorCode.java        |   8 +-
 .../transform/jsonpath/JsonPathTransform.java      |   2 +-
 .../transform/metadata/MetadataTransform.java      | 159 +++++++++++++++++++++
 .../MetadataTransformConfig.java}                  |  38 ++---
 .../metadata/MetadataTransformFactory.java         |  44 ++++++
 .../nlpmodel/embadding/EmbeddingTransform.java     |   2 +-
 .../transform/nlpmodel/llm/LLMTransform.java       |   2 +-
 .../transform/replace/ReplaceTransform.java        |   2 +-
 .../rowkind/RowKindExtractorTransform.java         |   2 +-
 .../seatunnel/transform/split/SplitTransform.java  |   2 +-
 .../transform/metadata/MetadataTransformTest.java  | 132 +++++++++++++++++
 58 files changed, 1752 insertions(+), 77 deletions(-)

diff --git a/docs/en/transform-v2/dynamic-compile.md 
b/docs/en/transform-v2/dynamic-compile.md
index fb5500880a..66b7ba1f83 100644
--- a/docs/en/transform-v2/dynamic-compile.md
+++ b/docs/en/transform-v2/dynamic-compile.md
@@ -88,7 +88,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                  import org.apache.seatunnel.api.table.catalog.CatalogTable
                  import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                  import org.apache.seatunnel.api.table.type.*;
@@ -146,7 +146,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column;
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
                  import org.apache.seatunnel.api.table.catalog.*;
                  import org.apache.seatunnel.api.table.type.*;
                  import java.util.ArrayList;
diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
new file mode 100644
index 0000000000..23f593ee0a
--- /dev/null
+++ b/docs/en/transform-v2/metadata.md
@@ -0,0 +1,85 @@
+# Metadata
+
+> Metadata transform plugin
+
+## Description
+Metadata transform plugin for adding metadata fields to data
+
+## Available Metadata
+
+|    Key    | DataType | Description                                           
                                             |
+|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
+| Database  |  string  | Name of the table that contain the row.               
                                             |
+|   Table   |  string  | Name of the table that contain the row.               
                                             |
+|  RowKind  |  string  | The type of operation                                 
                                             |
+| EventTime |   Long   | The time at which the connector processed the event.  
                                             |
+|   Delay   |   Long   | The difference between data extraction time and 
database change time                               |
+| Partition |  string  | Contains the partition field of the corresponding 
number table of the row, multiple using `,` join |
+
+### note
+    `Delay` `Partition` only worked on cdc series connectors for now , except 
TiDB-CDC
+
+## Options
+
+|      name       | type | required | default value | Description              
                                                 |
+|:---------------:|------|----------|---------------|---------------------------------------------------------------------------|
+| metadata_fields | map  | yes      |               | A mapping metadata input 
fields and their corresponding output fields.    |
+
+### metadata_fields [map]
+
+A mapping between metadata fields and their respective output fields. 
+
+```hocon
+metadata_fields {
+  Database = c_database
+  Table = c_table
+  RowKind = c_rowKind
+  EventTime = c_ts_ms
+  Delay = c_delay
+}
+```
+
+## Examples
+
+```yaml
+
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+    checkpoint.interval = 5000
+    read_limit.bytes_per_second = 7000000
+    read_limit.rows_per_second = 400
+}
+
+source {
+    MySQL-CDC {
+        result_table_name = "customers_mysql_cdc"
+        server-id = 5652
+        username = "root"
+        password = "zdyk_Dev@2024"
+        table-names = ["source.user"]
+        base-url = "jdbc:mysql://172.16.17.123:3306/source"
+    }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+      Database = database
+      Table = table
+      RowKind = rowKind
+      EventTime = ts_ms
+      Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "custom_name"
+  }
+}
+
+```
+
diff --git a/docs/zh/transform-v2/dynamic-compile.md 
b/docs/zh/transform-v2/dynamic-compile.md
index c5af808d4e..4db9b86d73 100644
--- a/docs/zh/transform-v2/dynamic-compile.md
+++ b/docs/zh/transform-v2/dynamic-compile.md
@@ -85,7 +85,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                  import org.apache.seatunnel.api.table.catalog.CatalogTable
                  import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                  import org.apache.seatunnel.api.table.type.*;
@@ -143,7 +143,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column;
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
                  import org.apache.seatunnel.api.table.catalog.*;
                  import org.apache.seatunnel.api.table.type.*;
                  import java.util.ArrayList;
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
new file mode 100644
index 0000000000..09a743adf0
--- /dev/null
+++ b/docs/zh/transform-v2/metadata.md
@@ -0,0 +1,85 @@
+# Metadata
+
+> Metadata transform plugin
+
+## Description
+元数据转换插件,用于将元数据字段添加到数据中
+
+## 支持的元数据
+
+|    Key    | DataType |       Description       |
+|:---------:|:--------:|:-----------------------:|
+| Database  |  string  |        包含该行的数据库名        |
+|   Table   |  string  |        包含该行的数表名         |
+|  RowKind  |  string  |           行类型           |
+| EventTime |   Long   |                         |
+|   Delay   |   Long   |    数据抽取时间与数据库变更时间的差     |
+| Partition |  string  | 包含该行对应数表的分区字段,多个使用`,`连接 |
+
+### 注意事项
+    `Delay` `Partition`目前只适用于cdc系列连接器,除外TiDB-CDC
+
+## 配置选项
+
+|      name       | type | required | default value | Description       |
+|:---------------:|------|:--------:|:-------------:|-------------------|
+| metadata_fields | map  |    是     |       -       | 元数据字段与输入字段相应的映射关系 |
+
+### metadata_fields [map]
+
+元数据字段和相应的输出字段之间的映射关系
+
+```hocon
+metadata_fields {
+  database = c_database
+  table = c_table
+  rowKind = c_rowKind
+  ts_ms = c_ts_ms
+  delay = c_delay
+}
+```
+
+## 示例
+
+```yaml
+
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+    checkpoint.interval = 5000
+    read_limit.bytes_per_second = 7000000
+    read_limit.rows_per_second = 400
+}
+
+source {
+    MySQL-CDC {
+        result_table_name = "customers_mysql_cdc"
+        server-id = 5652
+        username = "root"
+        password = "zdyk_Dev@2024"
+        table-names = ["source.user"]
+        base-url = "jdbc:mysql://172.16.17.123:3306/source"
+    }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+        Database = database
+        Table = table
+        RowKind = rowKind
+        EventTime = ts_ms
+        Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Console {
+  source_table_name = "custom_name"
+  }
+}
+
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 82c941b70f..c494686161 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -154,3 +154,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
 seatunnel.transform.LLM = seatunnel-transforms-v2
 seatunnel.transform.Embedding = seatunnel-transforms-v2
 seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
+seatunnel.transform.Metadata = seatunnel-transforms-v2
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
index 839d611132..8b5b36682a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
@@ -30,18 +30,44 @@ public enum CommonOptions {
     /**
      * The key of {@link Column#getOptions()} to specify the column value is a 
json format string.
      */
-    JSON("Json"),
+    JSON("Json", false),
     /** The key of {@link Column#getOptions()} to specify the column value is 
a metadata field. */
-    METADATA("Metadata"),
+    METADATA("Metadata", false),
     /**
      * The key of {@link SeaTunnelRow#getOptions()} to store the partition 
value of the row value.
      */
-    PARTITION("Partition"),
-    ;
+    PARTITION("Partition", true),
+    /**
+     * The key of {@link SeaTunnelRow#getOptions()} to store the DATABASE 
value of the row value.
+     */
+    DATABASE("Database", true),
+    /** The key of {@link SeaTunnelRow#getOptions()} to store the TABLE value 
of the row value. */
+    TABLE("Table", true),
+    /**
+     * The key of {@link SeaTunnelRow#getOptions()} to store the ROW_KIND 
value of the row value.
+     */
+    ROW_KIND("RowKind", true),
+    /**
+     * The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME 
value of the row value.
+     */
+    EVENT_TIME("EventTime", true),
+    /** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value 
of the row value. */
+    DELAY("Delay", true);
 
     private final String name;
+    private final boolean supportMetadataTrans;
 
-    CommonOptions(String name) {
+    CommonOptions(String name, boolean supportMetadataTrans) {
         this.name = name;
+        this.supportMetadataTrans = supportMetadataTrans;
+    }
+
+    public static CommonOptions fromName(String name) {
+        for (CommonOptions option : CommonOptions.values()) {
+            if (option.getName().equals(name)) {
+                return option;
+            }
+        }
+        throw new IllegalArgumentException("Unknown option name: " + name);
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
new file mode 100644
index 0000000000..42ab203576
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
+import static org.apache.seatunnel.api.table.type.CommonOptions.PARTITION;
+
+public class MetadataUtil {
+
+    public static final List<String> METADATA_FIELDS;
+
+    static {
+        METADATA_FIELDS = new ArrayList<>();
+        Stream.of(CommonOptions.values())
+                .filter(CommonOptions::isSupportMetadataTrans)
+                .map(CommonOptions::getName)
+                .forEach(METADATA_FIELDS::add);
+    }
+
+    public static void setDelay(SeaTunnelRow row, Long delay) {
+        row.getOptions().put(DELAY.getName(), delay);
+    }
+
+    public static void setPartition(SeaTunnelRow row, String[] partition) {
+        row.getOptions().put(PARTITION.getName(), partition);
+    }
+
+    public static void setEventTime(SeaTunnelRow row, Long delay) {
+        row.getOptions().put(EVENT_TIME.getName(), delay);
+    }
+
+    public static Long getDelay(SeaTunnelRowAccessor row) {
+        return (Long) row.getOptions().get(DELAY.getName());
+    }
+
+    public static String getDatabase(SeaTunnelRowAccessor row) {
+        if (row.getTableId() == null) {
+            return null;
+        }
+        return TablePath.of(row.getTableId()).getDatabaseName();
+    }
+
+    public static String getTable(SeaTunnelRowAccessor row) {
+        if (row.getTableId() == null) {
+            return null;
+        }
+        return TablePath.of(row.getTableId()).getTableName();
+    }
+
+    public static String getRowKind(SeaTunnelRowAccessor row) {
+        return row.getRowKind().shortString();
+    }
+
+    public static String getPartitionStr(SeaTunnelRowAccessor row) {
+        Object partition = row.getOptions().get(PARTITION.getName());
+        return Objects.nonNull(partition) ? String.join(",", (String[]) 
partition) : null;
+    }
+
+    public static String[] getPartition(SeaTunnelRowAccessor row) {
+        return (String[]) row.getOptions().get(PARTITION.getName());
+    }
+
+    public static Long getEventTime(SeaTunnelRowAccessor row) {
+        return (Long) row.getOptions().get(EVENT_TIME.getName());
+    }
+
+    public static boolean isMetadataField(String fieldName) {
+        return METADATA_FIELDS.contains(fieldName);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index b6da4eea7b..84e172f2df 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -34,10 +34,10 @@ public final class SeaTunnelRow implements Serializable {
     /** The array to store the actual internal format values. */
     private final Object[] fields;
 
-    private volatile int size;
-
     private Map<String, Object> options;
 
+    private volatile int size;
+
     public SeaTunnelRow(int arity) {
         this.fields = new Object[arity];
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java
similarity index 89%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java
index 5b97f34168..6bbca49cd5 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.common;
-
-import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+package org.apache.seatunnel.api.table.type;
 
 import lombok.AllArgsConstructor;
 
+import java.util.Map;
+
 @AllArgsConstructor
 public class SeaTunnelRowAccessor {
     private final SeaTunnelRow row;
@@ -45,4 +44,8 @@ public class SeaTunnelRowAccessor {
     public Object[] getFields() {
         return row.getFields();
     }
+
+    public Map<String, Object> getOptions() {
+        return row.getOptions();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
index f7e9577ddb..1df22ee428 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
@@ -67,10 +67,9 @@ public class SourceRecordUtils {
     }
 
     /**
-     * Return the timestamp when the change event is produced in MySQL.
-     *
-     * <p>The field `source.ts_ms` in {@link SourceRecord} data struct is the 
time when the change
-     * event is operated in MySQL.
+     * In the source object, ts_ms indicates the time that the change was made 
in the database. By
+     * comparing the value for payload.source.ts_ms with the value for 
payload.ts_ms, you can
+     * determine the lag between the source database update and Debezium.
      */
     public static Long getMessageTimestamp(SourceRecord record) {
         Schema schema = record.valueSchema();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 948e872d48..117e84325e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import 
org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
 import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -181,26 +182,39 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
         } else {
             converters = tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
         }
-
+        Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
+        Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
+        long delay = -1L;
+        if (fetchTimestamp != null && messageTimestamp != null) {
+            delay = fetchTimestamp - messageTimestamp;
+        }
         if (operation == Envelope.Operation.CREATE || operation == 
Envelope.Operation.READ) {
             SeaTunnelRow insert = extractAfterRow(converters, record, 
messageStruct, valueSchema);
             insert.setRowKind(RowKind.INSERT);
             insert.setTableId(tableId);
+            MetadataUtil.setDelay(insert, delay);
+            MetadataUtil.setEventTime(insert, fetchTimestamp);
             collector.collect(insert);
         } else if (operation == Envelope.Operation.DELETE) {
             SeaTunnelRow delete = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
             delete.setRowKind(RowKind.DELETE);
             delete.setTableId(tableId);
+            MetadataUtil.setDelay(delete, delay);
+            MetadataUtil.setEventTime(delete, fetchTimestamp);
             collector.collect(delete);
         } else if (operation == Envelope.Operation.UPDATE) {
             SeaTunnelRow before = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
             before.setRowKind(RowKind.UPDATE_BEFORE);
             before.setTableId(tableId);
+            MetadataUtil.setDelay(before, delay);
+            MetadataUtil.setEventTime(before, fetchTimestamp);
             collector.collect(before);
 
             SeaTunnelRow after = extractAfterRow(converters, record, 
messageStruct, valueSchema);
             after.setRowKind(RowKind.UPDATE_AFTER);
             after.setTableId(tableId);
+            MetadataUtil.setDelay(after, delay);
+            MetadataUtil.setEventTime(after, fetchTimestamp);
             collector.collect(after);
         } else {
             log.warn("Received {} operation, skip", operation);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 4811217cf4..8b76d202d7 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -24,12 +24,14 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
 
@@ -111,18 +113,27 @@ public class MongoDBConnectorDeserializationSchema
             log.debug("Ignore newly added table {}", tableId);
             return;
         }
-
+        Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
+        Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
+        long delay = -1L;
+        if (fetchTimestamp != null && messageTimestamp != null) {
+            delay = fetchTimestamp - messageTimestamp;
+        }
         switch (op) {
             case INSERT:
                 SeaTunnelRow insert = extractRowData(tableRowConverter, 
fullDocument);
                 insert.setRowKind(RowKind.INSERT);
                 insert.setTableId(tableId);
+                MetadataUtil.setDelay(insert, delay);
+                MetadataUtil.setEventTime(insert, fetchTimestamp);
                 emit(record, insert, out);
                 break;
             case DELETE:
                 SeaTunnelRow delete = extractRowData(tableRowConverter, 
documentKey);
                 delete.setRowKind(RowKind.DELETE);
                 delete.setTableId(tableId);
+                MetadataUtil.setDelay(delete, delay);
+                MetadataUtil.setEventTime(delete, fetchTimestamp);
                 emit(record, delete, out);
                 break;
             case UPDATE:
@@ -132,12 +143,16 @@ public class MongoDBConnectorDeserializationSchema
                 SeaTunnelRow updateAfter = extractRowData(tableRowConverter, 
fullDocument);
                 updateAfter.setRowKind(RowKind.UPDATE_AFTER);
                 updateAfter.setTableId(tableId);
+                MetadataUtil.setDelay(updateAfter, delay);
+                MetadataUtil.setEventTime(updateAfter, fetchTimestamp);
                 emit(record, updateAfter, out);
                 break;
             case REPLACE:
                 SeaTunnelRow replaceAfter = extractRowData(tableRowConverter, 
fullDocument);
                 replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
                 replaceAfter.setTableId(tableId);
+                MetadataUtil.setDelay(replaceAfter, delay);
+                MetadataUtil.setEventTime(replaceAfter, fetchTimestamp);
                 emit(record, replaceAfter, out);
                 break;
             case INVALIDATE:
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
index a8814c11ee..e985193498 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
@@ -67,5 +67,19 @@
             <version>${mysql.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index 3789731354..7cf18c8032 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.bson.Document;
 import org.junit.jupiter.api.AfterAll;
@@ -45,6 +46,7 @@ import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.Sorts;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
@@ -148,7 +150,9 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
     }
 
     @TestTemplate
-    public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) {
+    public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container)
+            throws InterruptedException {
+        cleanSourceTable();
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
@@ -225,6 +229,45 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
                         });
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "This case requires obtaining the task health status and 
manually canceling the canceled task, which is currently only supported by the 
zeta engine.")
+    public void testMongodbCdcMetadataTrans(TestContainer container) throws 
InterruptedException {
+        cleanSourceTable();
+        Long jobId = JobIdGenerator.newJobId();
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/mongodbcdc_metadata_trans.conf", 
String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException();
+                    }
+                    return null;
+                });
+        TimeUnit.SECONDS.sleep(10);
+        // insert update delete
+        upsertDeleteSourceTable();
+        TimeUnit.SECONDS.sleep(20);
+        await().atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
+                            Assertions.assertEquals("RUNNING", jobStatus);
+                        });
+
+        try {
+            Container.ExecResult cancelJobResult = 
container.cancelJob(String.valueOf(jobId));
+            Assertions.assertEquals(0, cancelJobResult.getExitCode(), 
cancelJobResult.getStderr());
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private Connection getJdbcConnection() throws SQLException {
         return DriverManager.getConnection(
                 MYSQL_CONTAINER.getJdbcUrl(),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
new file mode 100644
index 0000000000..bc6475359c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MongoDB-CDC {
+    hosts = "mongo0:27017"
+    database = ["inventory"]
+    collection = ["inventory.products"]
+    username = superuser
+    password = superpw
+    schema = {
+      fields {
+        "_id": string,
+        "name": string,
+        "description": string,
+        "weight": string
+      }
+    }
+  }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+      Database = database
+      Table = table
+      RowKind = rowKind
+      EventTime = ts_ms
+      Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      field_rules = [
+        {
+          field_name = database
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = table
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = rowKind
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = ts_ms
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = delay
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
index 539fce1890..8cb38aaffa 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
@@ -54,6 +54,20 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mysql</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 2891907431..6f6467257f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -180,6 +180,45 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                         });
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "This case requires obtaining the task health status and 
manually canceling the canceled task, which is currently only supported by the 
zeta engine.")
+    public void testMysqlCdcMetadataTrans(TestContainer container) throws 
InterruptedException {
+        // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+        clearTable(MYSQL_DATABASE, SINK_TABLE);
+        Long jobId = JobIdGenerator.newJobId();
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/mysqlcdc_to_metadata_trans.conf", 
String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+        TimeUnit.SECONDS.sleep(10);
+        // insert update delete
+        upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+        TimeUnit.SECONDS.sleep(10);
+        await().atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
+                            Assertions.assertEquals("RUNNING", jobStatus);
+                        });
+        try {
+            Container.ExecResult cancelJobResult = 
container.cancelJob(String.valueOf(jobId));
+            Assertions.assertEquals(0, cancelJobResult.getExitCode(), 
cancelJobResult.getStderr());
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @TestTemplate
     public void testMysqlCdcCheckDataWithDisableExactlyonce(TestContainer 
container) {
         // Clear related content to ensure that multiple operations are not 
affected
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..8787c8987d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second = 7000000
+  read_limit.rows_per_second = 400
+}
+
+source {
+  MySQL-CDC {
+    result_table_name = "customers_mysql_cdc"
+    server-id = 5652
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+  }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+      Database = database
+      Table = table
+      RowKind = rowKind
+      EventTime = ts_ms
+      Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      field_rules = [
+        {
+          field_name = database
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = table
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = rowKind
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = ts_ms
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = delay
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
index f95e5cdb1a..b855c0d6d5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
@@ -57,6 +57,20 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>postgresql</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
index ed3fdd74b4..35be6c0d12 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -199,6 +200,47 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "This case requires obtaining the task health status and 
manually canceling the canceled task, which is currently only supported by the 
zeta engine.")
+    public void testOpengaussCdcMeatadataTrans(TestContainer container)
+            throws InterruptedException, IOException {
+        try {
+            Long jobId = JobIdGenerator.newJobId();
+            CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            container.executeJob(
+                                    "/opengausscdc_to_meatadata_trans.conf", 
String.valueOf(jobId));
+                        } catch (Exception e) {
+                            log.error("Commit task exception :" + 
e.getMessage());
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    });
+            TimeUnit.SECONDS.sleep(10);
+            // insert update delete
+            upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+
+            TimeUnit.SECONDS.sleep(20);
+            Awaitility.await()
+                    .atMost(2, TimeUnit.MINUTES)
+                    .untilAsserted(
+                            () -> {
+                                String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
+                                Assertions.assertEquals("RUNNING", jobStatus);
+                            });
+            Container.ExecResult cancelJobResult = 
container.cancelJob(String.valueOf(jobId));
+            Assertions.assertEquals(0, cancelJobResult.getExitCode(), 
cancelJobResult.getStderr());
+        } finally {
+            clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+            clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1);
+        }
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf
new file mode 100644
index 0000000000..9de18e661d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  Opengauss-CDC {
+    result_table_name = "customers_opengauss_cdc"
+    username = "gaussdb"
+    password = "openGauss@123"
+    database-names = ["opengauss_cdc"]
+    schema-names = ["inventory"]
+    table-names = ["opengauss_cdc.inventory.opengauss_cdc_table_1"]
+    base-url = 
"jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc?loggerLevel=OFF"
+    decoding.plugin.name = "pgoutput"
+  }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+      Database = database
+      Table = table
+      RowKind = rowKind
+      EventTime = ts_ms
+      Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      field_rules = [
+        {
+          field_name = database
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = table
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = rowKind
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = ts_ms
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = delay
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
index 01c9b2c756..0757d1128f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
@@ -56,6 +56,21 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>jdbc</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index d09b502d8f..3376ed74bc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -253,6 +253,48 @@ public class OracleCDCIT extends AbstractOracleCDCIT 
implements TestResource {
                         });
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "This case requires obtaining the task health status and 
manually canceling the canceled task, which is currently only supported by the 
zeta engine.")
+    public void testOracleCdcMetadataTrans(TestContainer container) throws 
Exception {
+
+        clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+        clearTable(DATABASE, SINK_TABLE1);
+
+        insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+        Long jobId = JobIdGenerator.newJobId();
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/oraclecdc_to_metadata_trans.conf", 
String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        TimeUnit.SECONDS.sleep(10);
+        // insert update delete
+        updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+        TimeUnit.SECONDS.sleep(20);
+        await().atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
+                            Assertions.assertEquals("RUNNING", jobStatus);
+                        });
+        try {
+            Container.ExecResult cancelJobResult = 
container.cancelJob(String.valueOf(jobId));
+            Assertions.assertEquals(0, cancelJobResult.getExitCode(), 
cancelJobResult.getStderr());
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..6a24214f8c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Oracle-CDC {
+    result_table_name = "customers"
+    username = "system"
+    password = "top_secret"
+    database-names = ["ORCLCDB"]
+    schema-names = ["DEBEZIUM"]
+    base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+    source.reader.close.timeout = 120000
+    connection.pool.size = 1
+    debezium {
+      #  log.mining.strategy = "online_catalog"
+      #  log.mining.continuous.mine = true
+        database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+
+    table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"]
+    table-names-config = [
+      {
+        table = "ORCLCDB.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"
+        primaryKeys = ["ID"]
+      }
+    ]
+
+    exactly_once = true
+  }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+      Database = database
+      Table = table
+      RowKind = rowKind
+      EventTime = ts_ms
+      Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      field_rules = [
+        {
+          field_name = database
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = table
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = rowKind
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = ts_ms
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = delay
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
index 0e78978295..bb152c2795 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
@@ -53,6 +53,20 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>postgresql</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 3abca057fb..acb9a2a41c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -184,6 +184,49 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "This case requires obtaining the task health status and 
manually canceling the canceled task, which is currently only supported by the 
zeta engine.")
+    public void testMPostgresCdcMetadataTrans(TestContainer container) throws 
InterruptedException {
+
+        Long jobId = JobIdGenerator.newJobId();
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/postgrescdc_to_postgres.conf", 
String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+        TimeUnit.SECONDS.sleep(10);
+        // insert update delete
+        upsertDeleteSourceTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_1);
+
+        TimeUnit.SECONDS.sleep(20);
+        await().atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
+                            Assertions.assertEquals("RUNNING", jobStatus);
+                        });
+
+        try {
+            Container.ExecResult cancelJobResult = 
container.cancelJob(String.valueOf(jobId));
+            Assertions.assertEquals(0, cancelJobResult.getExitCode(), 
cancelJobResult.getStderr());
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            // Clear related content to ensure that multiple operations are 
not affected
+            clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_1);
+            clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_1);
+        }
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..d0337069b0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  Postgres-CDC {
+    result_table_name = "customers_postgres_cdc"
+    username = "postgres"
+    password = "postgres"
+    database-names = ["postgres_cdc"]
+    schema-names = ["inventory"]
+    table-names = ["postgres_cdc.inventory.postgres_cdc_table_1"]
+    base-url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+    decoding.plugin.name = "decoderbufs"
+  }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+      Database = database
+      Table = table
+      RowKind = rowKind
+      EventTime = ts_ms
+      Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      field_rules = [
+        {
+          field_name = database
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = table
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = rowKind
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = ts_ms
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = delay
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
index 0c5a0fa8e0..59a673fe86 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
@@ -51,6 +51,21 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>jdbc</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index eb891be771..1b699d5805 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
@@ -307,6 +308,44 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                         });
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "This case requires obtaining the task health status and 
manually canceling the canceled task, which is currently only supported by the 
zeta engine.")
+    public void testSqlServerCDCMetadataTrans(TestContainer container) throws 
InterruptedException {
+        initializeSqlServerTable("column_type_test");
+
+        Long jobId = JobIdGenerator.newJobId();
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/sqlservercdc_to_metadata_trans.conf", 
String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+        TimeUnit.SECONDS.sleep(10);
+        // insert update delete
+        updateSourceTable(SOURCE_TABLE_CUSTOM_PRIMARY_KEY);
+        TimeUnit.SECONDS.sleep(20);
+        await().atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
+                            Assertions.assertEquals("RUNNING", jobStatus);
+                        });
+        try {
+            Container.ExecResult cancelJobResult = 
container.cancelJob(String.valueOf(jobId));
+            Assertions.assertEquals(0, cancelJobResult.getExitCode(), 
cancelJobResult.getStderr());
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Executes a JDBC statement using the default jdbc config without 
autocommitting the
      * connection.
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf
new file mode 100644
index 0000000000..49272fc5cf
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  SqlServer-CDC {
+    result_table_name = "customers"
+    username = "sa"
+    password = "Password!"
+    database-names = ["column_type_test"]
+    table-names = ["column_type_test.dbo.full_types_custom_primary_key"]
+    base-url = 
"jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+
+    exactly_once = true
+    table-names-config = [
+      {
+        table = "column_type_test.dbo.full_types_custom_primary_key"
+        primaryKeys = ["id"]
+      }
+    ]
+  }
+}
+
+transform {
+  Metadata {
+    metadata_fields {
+      Database = database
+      Table = table
+      RowKind = rowKind
+      EventTime = ts_ms
+      Delay = delay
+    }
+    result_table_name = "trans_result"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "trans_result"
+    rules {
+      field_rules = [
+        {
+          field_name = database
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = table
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = rowKind
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = ts_ms
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }, {
+          field_name = delay
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index 72584158f6..b55cbae6c7 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -68,6 +68,10 @@ public interface TestContainer extends TestResource {
         throw new UnsupportedOperationException("Not implemented");
     }
 
+    default String getJobStatus(String jobId) {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
     String getServerLogs();
 
     void copyFileToContainer(String path, String targetPath);
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index e33d89cc0a..03a0f87be8 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -17,7 +17,10 @@
 
 package org.apache.seatunnel.e2e.common.container.seatunnel;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
 import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -25,6 +28,7 @@ import 
org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.apache.commons.compress.utils.Lists;
+import org.apache.http.HttpStatus;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -96,7 +100,7 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                                                 "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
                         .waitingFor(Wait.forLogMessage(".*received new worker 
register:.*", 1));
         copySeaTunnelStarterToContainer(server);
-        server.setPortBindings(Collections.singletonList("5801:5801"));
+        server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
         server.withCopyFileToContainer(
                 MountableFile.forHostPath(
                         PROJECT_ROOT_PATH
@@ -490,6 +494,24 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         return cancelJob(server, jobId);
     }
 
+    @Override
+    public String getJobStatus(String jobId) {
+        HttpGet get = new HttpGet("http://"; + server.getHost() + 
":8080/job-info/" + jobId);
+        try (CloseableHttpClient client = HttpClients.createDefault()) {
+            CloseableHttpResponse response = client.execute(get);
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                String jobStatus = EntityUtils.toString(response.getEntity());
+                ObjectNode jsonNodes = JsonUtils.parseObject(jobStatus);
+                if (jsonNodes.has("jobStatus")) {
+                    return jsonNodes.get("jobStatus").asText();
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return null;
+    }
+
     @Override
     public String getServerLogs() {
         return server.getLogs();
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 80b928fcdc..2c3d39d0be 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -33,10 +33,10 @@ seatunnel:
         plugin-config:
           namespace: /tmp/seatunnel/checkpoint_snapshot/
     http:
-        enable-http: false
+        enable-http: true
         port: 8080
     telemetry:
       metric:
          enabled: false
       logs:
-         scheduled-deletion-enable: true
\ No newline at end of file
+         scheduled-deletion-enable: true
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
index e91765fbf3..960df97405 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
@@ -46,7 +46,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column;
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
                  import org.apache.seatunnel.api.table.catalog.*;
                  import org.apache.seatunnel.api.table.type.*;
                  import java.util.ArrayList;
@@ -84,7 +84,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                  import org.apache.seatunnel.api.table.catalog.CatalogTable
                  import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                  import org.apache.seatunnel.api.table.type.*;
@@ -154,4 +154,4 @@ sink {
        }
    }
 
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
index 8689404a17..5d226764c1 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf
@@ -43,7 +43,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                  import org.apache.seatunnel.api.table.catalog.CatalogTable
                  import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                  import org.apache.seatunnel.api.table.type.*;
@@ -77,7 +77,7 @@ transform {
       compile_pattern="SOURCE_CODE"
       source_code="""
                    import org.apache.seatunnel.api.table.catalog.Column
-                   import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                   import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                    import org.apache.seatunnel.api.table.catalog.CatalogTable
                    import 
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                    import org.apache.seatunnel.api.table.type.*;
@@ -154,4 +154,4 @@ sink {
         ]
       }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
index 9e59a5e535..64272fef83 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf
@@ -46,7 +46,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column;
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
                  import org.apache.seatunnel.api.table.catalog.*;
                  import org.apache.seatunnel.api.table.type.*;
                  import java.util.ArrayList;
@@ -84,7 +84,7 @@ transform {
       compile_pattern="SOURCE_CODE"
       source_code="""
                    import org.apache.seatunnel.api.table.catalog.Column;
-                   import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                   import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
                    import org.apache.seatunnel.api.table.catalog.*;
                    import org.apache.seatunnel.api.table.type.*;
                    import java.util.ArrayList;
@@ -156,4 +156,4 @@ sink {
        }
    }
 
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
index 7958b88076..661f5562ee 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf
@@ -43,7 +43,7 @@ transform {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                  import org.apache.seatunnel.api.table.catalog.CatalogTable
                  import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                  import org.apache.seatunnel.api.table.type.*;
@@ -108,4 +108,4 @@ sink {
         ]
       }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
index 904066d69b..6aa16b64b9 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf
@@ -44,7 +44,7 @@ transform {
     source_code="""
                  import cn.hutool.http.HttpUtil;
                  import org.apache.seatunnel.api.table.catalog.Column
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                  import org.apache.seatunnel.api.table.catalog.CatalogTable
                  import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                  import org.apache.seatunnel.api.table.type.*;
@@ -112,4 +112,4 @@ Assert {
       }
   }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
index b65877d465..1f732bb306 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf
@@ -43,7 +43,7 @@ DynamicCompile {
     compile_pattern="SOURCE_CODE"
     source_code="""
                  import org.apache.seatunnel.api.table.catalog.Column;
-                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
                  import org.apache.seatunnel.api.table.catalog.*;
                  import org.apache.seatunnel.api.table.type.*;
                  import java.util.ArrayList;
@@ -112,4 +112,4 @@ sink {
         ]
       }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
index 9bb6a8fcdf..079e78d677 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 import org.apache.seatunnel.api.table.catalog.Column
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
 import org.apache.seatunnel.api.table.catalog.CatalogTable
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.*;
@@ -39,4 +39,4 @@ class demo  {
         fieldValues[0]="AA"
         return fieldValues;
     }
-};
\ No newline at end of file
+};
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
index 7d1947c077..0fe36b01c0 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile
@@ -18,7 +18,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 
 import java.util.ArrayList;
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 51e1172412..7f6ac1fba7 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
index 5b97f34168..39dd951748 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import lombok.AllArgsConstructor;
 
 @AllArgsConstructor
+@Deprecated
 public class SeaTunnelRowAccessor {
     private final SeaTunnelRow row;
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index 13d25989ac..394242a41b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
index 1718030db6..75712b5fc8 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
@@ -24,10 +24,10 @@ import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import java.lang.reflect.Array;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
index ea55569420..bfae2b8d2a 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
@@ -20,10 +20,10 @@ package org.apache.seatunnel.transform.dynamiccompile;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.dynamiccompile.parse.AbstractParse;
 import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse;
 import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
index b35df6a448..51e4b8265f 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -25,6 +25,8 @@ import java.util.Map;
 
 import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
 import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
+import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_FIELDS_NOT_FOUND;
+import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_MAPPING_FIELD_EXISTS;
 
 /** The common error of SeaTunnel transform. Please refer {@link CommonError} 
*/
 public class TransformCommonError {
@@ -43,4 +45,18 @@ public class TransformCommonError {
         params.put("transform", transform);
         return new TransformException(INPUT_FIELDS_NOT_FOUND, params);
     }
+
+    public static TransformException cannotFindMetadataFieldError(String 
transform, String field) {
+        Map<String, String> params = new HashMap<>();
+        params.put("field", field);
+        params.put("transform", transform);
+        return new TransformException(METADATA_FIELDS_NOT_FOUND, params);
+    }
+
+    public static TransformException metadataMappingFieldExists(String 
transform, String field) {
+        Map<String, String> params = new HashMap<>();
+        params.put("field", field);
+        params.put("transform", transform);
+        return new TransformException(METADATA_MAPPING_FIELD_EXISTS, params);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index dc5008ec04..b0d72d7cf1 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -25,7 +25,13 @@ public enum TransformCommonErrorCode implements 
SeaTunnelErrorCode {
             "The input field '<field>' of '<transform>' transform not found in 
upstream schema"),
     INPUT_FIELDS_NOT_FOUND(
             "TRANSFORM_COMMON-02",
-            "The input fields '<fields>' of '<transform>' transform not found 
in upstream schema");
+            "The input fields '<fields>' of '<transform>' transform not found 
in upstream schema"),
+    METADATA_FIELDS_NOT_FOUND(
+            "TRANSFORM_COMMON-03",
+            "The metadata fields '<field>' of '<transform>' transform not 
found "),
+    METADATA_MAPPING_FIELD_EXISTS(
+            "TRANSFORM_COMMON-04",
+            "The metadata mapping field '<field>' of '<transform>' transform 
already exists in upstream schema");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 3e14e8488d..7978aa2260 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -23,12 +23,12 @@ import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.format.json.JsonToRowConverters;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
new file mode 100644
index 0000000000..2a1679b500
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.metadata;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.CommonOptions;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.NonNull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.seatunnel.api.table.type.MetadataUtil.isMetadataField;
+
+public class MetadataTransform extends MultipleFieldOutputTransform {
+
+    private List<String> fieldNames;
+    private Map<String, String> metadataFieldMapping;
+
+    public MetadataTransform(ReadonlyConfig config, @NonNull CatalogTable 
inputCatalogTable) {
+        super(inputCatalogTable);
+        initOutputFields(inputCatalogTable, 
config.get(MetadataTransformConfig.METADATA_FIELDS));
+    }
+
+    private void initOutputFields(CatalogTable inputCatalogTable, Map<String, 
String> fields) {
+        List<String> sourceTableFiledNames =
+                
Arrays.asList(inputCatalogTable.getTableSchema().getFieldNames());
+        List<String> fieldNames = new ArrayList<>();
+        for (Map.Entry<String, String> field : fields.entrySet()) {
+            String srcField = field.getKey();
+            if (!isMetadataField(srcField)) {
+                throw 
TransformCommonError.cannotFindMetadataFieldError(getPluginName(), srcField);
+            }
+            String targetField = field.getValue();
+            if (sourceTableFiledNames.contains(targetField)) {
+                throw 
TransformCommonError.metadataMappingFieldExists(getPluginName(), srcField);
+            }
+            fieldNames.add(field.getKey());
+        }
+        this.fieldNames = fieldNames;
+        this.metadataFieldMapping = fields;
+    }
+
+    @Override
+    public String getPluginName() {
+        return MetadataTransformConfig.PLUGIN_NAME;
+    }
+
+    @Override
+    protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+        Object[] value = new Object[fieldNames.size()];
+        for (Map.Entry<String, String> mapping : 
metadataFieldMapping.entrySet()) {
+            String metadataFieldName = mapping.getKey();
+            String mappingFieldName = mapping.getValue();
+            int i = fieldNames.indexOf(metadataFieldName);
+            Object fieldValue = null;
+            switch (CommonOptions.fromName(metadataFieldName)) {
+                case DATABASE:
+                    fieldValue = MetadataUtil.getDatabase(inputRow);
+                    break;
+                case TABLE:
+                    fieldValue = MetadataUtil.getTable(inputRow);
+                    break;
+                case ROW_KIND:
+                    fieldValue = MetadataUtil.getRowKind(inputRow);
+                    break;
+                case DELAY:
+                    fieldValue = MetadataUtil.getDelay(inputRow);
+                    break;
+                case EVENT_TIME:
+                    fieldValue = MetadataUtil.getEventTime(inputRow);
+                    break;
+                case PARTITION:
+                    fieldValue = MetadataUtil.getPartitionStr(inputRow);
+                    break;
+                default:
+                    throw TransformCommonError.cannotFindMetadataFieldError(
+                            getPluginName(), mappingFieldName);
+            }
+            value[i] = fieldValue;
+        }
+        return value;
+    }
+
+    @Override
+    protected Column[] getOutputColumns() {
+        Column[] columns = new Column[fieldNames.size()];
+        for (Map.Entry<String, String> mapping : 
metadataFieldMapping.entrySet()) {
+            String metadataFieldName = mapping.getKey();
+            String mappingFieldName = mapping.getValue();
+            int i = fieldNames.indexOf(metadataFieldName);
+            Column column;
+            switch (CommonOptions.fromName(metadataFieldName)) {
+                case DATABASE:
+                case TABLE:
+                case ROW_KIND:
+                case PARTITION:
+                    column =
+                            PhysicalColumn.of(
+                                    mappingFieldName,
+                                    BasicType.STRING_TYPE,
+                                    (Long) null,
+                                    null,
+                                    true,
+                                    null,
+                                    null);
+                    break;
+                case DELAY:
+                case EVENT_TIME:
+                    column =
+                            PhysicalColumn.of(
+                                    mappingFieldName,
+                                    BasicType.LONG_TYPE,
+                                    (Long) null,
+                                    null,
+                                    true,
+                                    null,
+                                    null);
+                    break;
+                default:
+                    throw TransformCommonError.cannotFindMetadataFieldError(
+                            getPluginName(), mappingFieldName);
+            }
+            columns[i] = column;
+        }
+        return columns;
+    }
+
+    @VisibleForTesting
+    public void initRowContainerGenerator() {
+        transformTableSchema();
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java
similarity index 53%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java
index 5b97f34168..fe9971d3a1 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java
@@ -15,34 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.common;
+package org.apache.seatunnel.transform.metadata;
 
-import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig;
 
-import lombok.AllArgsConstructor;
+import java.util.Map;
 
-@AllArgsConstructor
-public class SeaTunnelRowAccessor {
-    private final SeaTunnelRow row;
+public class MetadataTransformConfig extends ModelTransformConfig {
 
-    public int getArity() {
-        return row.getArity();
-    }
+    public static final String PLUGIN_NAME = "Metadata";
 
-    public String getTableId() {
-        return row.getTableId();
-    }
-
-    public RowKind getRowKind() {
-        return row.getRowKind();
-    }
-
-    public Object getField(int pos) {
-        return row.getField(pos);
-    }
-
-    public Object[] getFields() {
-        return row.getFields();
-    }
+    public static final Option<Map<String, String>> METADATA_FIELDS =
+            Options.key("metadata_fields")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify the metadata field relationship between 
input and output");
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java
new file mode 100644
index 0000000000..ebbacb5cd0
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.metadata;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MetadataTransformFactory implements TableTransformFactory {
+    @Override
+    public String factoryIdentifier() {
+        return MetadataTransformConfig.PLUGIN_NAME;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return 
OptionRule.builder().required(MetadataTransformConfig.METADATA_FIELDS).build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        return () -> new MetadataTransform(context.getOptions(), 
context.getCatalogTables().get(0));
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
index 9e77043f0a..ce6d864da6 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java
@@ -21,10 +21,10 @@ import 
org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.VectorType;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
 import org.apache.seatunnel.transform.nlpmodel.ModelProvider;
 import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
index 069945951b..c99b03776e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
 import org.apache.seatunnel.transform.nlpmodel.ModelProvider;
 import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
index 5c5451fce7..b2c9fa44ce 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.transform.replace;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
index ae7b1fcaa1..354e5a3dd5 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
 
 import com.google.common.annotations.VisibleForTesting;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
index c1ead2dd0b..922cdbd97c 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
-import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import lombok.NonNull;
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
new file mode 100644
index 0000000000..a3ddf1ced4
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.metadata;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetadataTransformTest {
+
+    static CatalogTable catalogTable;
+
+    static Object[] values;
+
+    static SeaTunnelRow inputRow;
+
+    static Long eventTime;
+
+    @BeforeAll
+    static void setUp() {
+        catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("catalog", TablePath.DEFAULT),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key1",
+                                                BasicType.STRING_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key2",
+                                                BasicType.INT_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key3",
+                                                BasicType.LONG_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key4",
+                                                BasicType.DOUBLE_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "key5",
+                                                BasicType.FLOAT_TYPE,
+                                                1L,
+                                                Boolean.FALSE,
+                                                null,
+                                                null))
+                                .build(),
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "comment");
+        values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916, 
3.14};
+        inputRow = new SeaTunnelRow(values);
+        inputRow.setTableId(TablePath.DEFAULT.getFullName());
+        eventTime = 
LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli();
+        MetadataUtil.setDelay(inputRow, 150L);
+        MetadataUtil.setEventTime(inputRow, eventTime);
+        MetadataUtil.setPartition(inputRow, Arrays.asList("key1", 
"key2").toArray(new String[0]));
+    }
+
+    @Test
+    void testMetadataTransform() {
+        Map<String, String> metadataMapping = new HashMap<>();
+        metadataMapping.put("Database", "database");
+        metadataMapping.put("Table", "table");
+        metadataMapping.put("Partition", "partition");
+        metadataMapping.put("RowKind", "rowKind");
+        metadataMapping.put("EventTime", "ts_ms");
+        metadataMapping.put("Delay", "delay");
+        Map<String, Object> config = new HashMap<>();
+        config.put("metadata_fields", metadataMapping);
+        MetadataTransform transform =
+                new MetadataTransform(ReadonlyConfig.fromMap(config), 
catalogTable);
+        transform.initRowContainerGenerator();
+        SeaTunnelRow outputRow = transform.map(inputRow);
+        Assertions.assertEquals(values.length + 6, outputRow.getArity());
+        Assertions.assertEquals(
+                "SeaTunnelRow{tableId=default.default.default, kind=+I, 
fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, key1,key2, default, "
+                        + eventTime
+                        + ", +I, default, 150]}",
+                outputRow.toString());
+    }
+}

Reply via email to