This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4042767b94 [doc][transforms-v2] improve meatadata and rowkind doc 
(#10010)
4042767b94 is described below

commit 4042767b94ef861f7810e255a24e53c1cc5360f3
Author: Jast <[email protected]>
AuthorDate: Thu Nov 6 22:47:43 2025 +0800

    [doc][transforms-v2] improve meatadata and rowkind doc (#10010)
---
 docs/en/transform-v2/metadata.md          | 174 ++++++++++++----
 docs/en/transform-v2/rowkind-extractor.md | 327 ++++++++++++++++++++++-------
 docs/zh/transform-v2/metadata.md          | 176 ++++++++++++----
 docs/zh/transform-v2/rowkind-extractor.md | 334 +++++++++++++++++++++++-------
 4 files changed, 771 insertions(+), 240 deletions(-)

diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index 89c0bea651..be4be03c17 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -3,83 +3,171 @@
 > Metadata transform plugin
 
 ## Description
-Metadata transform plugin for adding metadata fields to data
 
-## Available Metadata
+The Metadata transform plugin is used to extract metadata information from 
data rows and convert it into regular fields for subsequent processing and 
analysis.
 
-|    Key    | DataType | Description                                           
                                                   |
-|:---------:|:--------:|:---------------------------------------------------------------------------------------------------------|
-| Database  |  string  | Name of the table that contain the row.               
                                                   |
-|   Table   |  string  | Name of the table that contain the row.               
                                                   |
-|  RowKind  |  string  | The type of operation                                 
                                                   |
-| EventTime |   Long   | The time at which the connector processed the 
event.And the data should be milliseconds                  |
-|   Delay   |   Long   | The difference between data extraction time and 
database change time.And the data should be milliseconds |
-| Partition |  string  | Contains the partition field of the corresponding 
number table of the row, multiple using `,` join       |
+**Core Features:**
+- Extracts metadata (such as database name, table name, row type, etc.) as 
visible fields
+- Supports custom output field names
+- Does not modify original data fields, only adds metadata fields
 
-### note
-    `Delay` `EventTime` only worked on cdc series connectors for now , except 
TiDB-CDC
+**Typical Use Cases:**
+- Recording data source (database name, table name) during CDC data 
synchronization
+- Tracking data change types (INSERT, UPDATE, DELETE)
+- Recording event time and delay information of data
+- Identifying data sources when merging multiple tables
+
+## Supported Metadata Fields
+
+|    Metadata Key    | Output Type |          Description          | Data 
Source |
+|:---------:|:--------:|:-----------------------------:|:----:|
+| Database  |  string  |  Name of the database containing the data  | All 
connectors |
+|   Table   |  string  |  Name of the table containing the data  | All 
connectors |
+|  RowKind  |  string  |  Row change type, values: +I (insert), -U (update 
before), +U (update after), -D (delete)  | All connectors |
+| EventTime |   long   |  Event timestamp of data change (milliseconds)  | CDC 
connectors |
+|   Delay   |   long   |  Data collection delay time (milliseconds), i.e., the 
difference between data extraction time and database change time  | CDC 
connectors |
+| Partition |  string  |  Partition information of the data, multiple 
partition fields separated by commas  | Connectors supporting partitions |
+
+### Important Notes
+
+1. **Metadata field names are case-sensitive**: Configuration must strictly 
follow the Key names in the table above (e.g., `Database`, `Table`, `RowKind`, 
etc.)
+2. **CDC-specific fields**: `EventTime` and `Delay` are only valid when using 
CDC connectors (except TiDB-CDC)
 
 ## Options
 
-|      name       | type | required | default value | Description              
                                                 |
-|:---------------:|------|----------|---------------|---------------------------------------------------------------------------|
-| metadata_fields | map  | yes      |               | A mapping metadata input 
fields and their corresponding output fields.    |
+|      name       | type | required | default value | description       |
+|:---------------:|------|:--------:|:-------------:|-------------------|
+| metadata_fields | map  |    no     |   empty map   | Mapping relationship 
between metadata fields and output fields, format: `Metadata Key = output field 
name` |
 
 ### metadata_fields [map]
 
-A mapping between metadata fields and their respective output fields. 
+Defines the mapping relationship between metadata fields and output fields.
 
+**Configuration Format:**
 ```hocon
 metadata_fields {
-  Database = c_database
-  Table = c_table
-  RowKind = c_rowKind
-  EventTime = c_ts_ms
-  Delay = c_delay
+  <Metadata Key> = <output field name>
+  <Metadata Key> = <output field name>
+  ...
 }
 ```
 
-## Examples
+**Configuration Example:**
+```hocon
+metadata_fields {
+  Database = source_db      # Map database name to source_db field
+  Table = source_table      # Map table name to source_table field
+  RowKind = op_type         # Map row type to op_type field
+  EventTime = event_ts      # Map event time to event_ts field
+  Delay = sync_delay        # Map delay time to sync_delay field
+  Partition = partition_info # Map partition info to partition_info field
+}
+```
 
-```yaml
+**Notes:**
+- The left side must be a supported metadata Key (see table above), and is 
strictly case-sensitive
+- The right side is a custom output field name, which cannot duplicate 
existing field names
+- You can select only the metadata fields you need, not all of them must be 
configured
+
+## Complete Examples
+
+### Example 1: MySQL CDC Data Synchronization, Extracting All Metadata
 
+Synchronizing data from MySQL database and extracting all available metadata 
information.
+
+```yaml
 env {
-    parallelism = 1
-    job.mode = "STREAMING"
-    checkpoint.interval = 5000
-    read_limit.bytes_per_second = 7000000
-    read_limit.rows_per_second = 400
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
 }
 
 source {
-    MySQL-CDC {
-        plugin_output = "customers_mysql_cdc"
-        server-id = 5652
-        username = "root"
-        password = "zdyk_Dev@2024"
-        table-names = ["source.user"]
-        url = "jdbc:mysql://172.16.17.123:3306/source"
-    }
+  MySQL-CDC {
+    plugin_output = "mysql_cdc_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["mydb.users"]
+    url = "jdbc:mysql://localhost:3306/mydb"
+  }
 }
 
 transform {
   Metadata {
+    plugin_input = "mysql_cdc_source"
+    plugin_output = "metadata_added"
     metadata_fields {
-      Database = database
-      Table = table
-      RowKind = rowKind
-      EventTime = ts_ms
-      Delay = delay
+      Database = source_database    # Extract database name
+      Table = source_table          # Extract table name
+      RowKind = change_type         # Extract change type
+      EventTime = event_timestamp   # Extract event time
+      Delay = sync_delay_ms         # Extract sync delay
     }
-    plugin_output = "trans_result"
   }
 }
 
 sink {
   Console {
-    plugin_input = "custom_name"
+    plugin_input = "metadata_added"
   }
 }
+```
 
+**Input Data Example:**
 ```
+Original data row (from mydb.users table):
+id=1, name="John", age=25
+RowKind: +I (INSERT)
+```
+
+**Output Data Example:**
+```
+Transformed data row:
+id=1, name="John", age=25, source_database="mydb", source_table="users",
+change_type="+I", event_timestamp=1699000000000, sync_delay_ms=100
+```
+
+---
 
+### Example 2: Extracting Only Partial Metadata
+
+Extracting only data source information (database name and table name) for 
multi-table merge scenarios.
+
+```yaml
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  MySQL-CDC {
+    plugin_output = "multi_table_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["db1.orders", "db2.orders"]
+    url = "jdbc:mysql://localhost:3306"
+  }
+}
+
+transform {
+  Metadata {
+    plugin_input = "multi_table_source"
+    plugin_output = "with_source_info"
+    metadata_fields {
+      Database = db_name
+      Table = table_name
+    }
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "with_source_info"
+    url = "jdbc:mysql://localhost:3306/target_db"
+    table = "merged_orders"
+    # Target table will contain db_name and table_name fields to identify data 
source
+  }
+}
+```
diff --git a/docs/en/transform-v2/rowkind-extractor.md 
b/docs/en/transform-v2/rowkind-extractor.md
index a2ee384c34..214b215311 100644
--- a/docs/en/transform-v2/rowkind-extractor.md
+++ b/docs/en/transform-v2/rowkind-extractor.md
@@ -4,110 +4,287 @@
 
 ## Description
 
-transform cdc row to append only row that contains the cdc RowKind. <br />
-Example: <br />
-CDC row: -D 1, test1, test2 <br />
-transformed Row: +I 1,test1,test2,DELETE
+The RowKindExtractor transform plugin is used to convert CDC (Change Data 
Capture) data streams into Append-Only mode while extracting the original 
RowKind information as a new field.
+
+**Core Features:**
+- Converts all data rows' RowKind to `+I` (INSERT), achieving Append-Only mode
+- Saves the original RowKind information (INSERT, UPDATE_BEFORE, UPDATE_AFTER, 
DELETE) to a newly added field
+- Supports both short format and full format output
+
+**Why is this plugin needed?**
+
+In CDC data synchronization scenarios, data rows carry RowKind markers (+I, 
-U, +U, -D) representing different change types. However, some downstream 
systems (such as data lakes, analytical systems) only support Append-Only mode 
and do not support UPDATE and DELETE operations. In such cases, you need to:
+1. Convert all data to INSERT type (Append-Only)
+2. Save the original change type as a regular field for subsequent analysis
+
+**Transformation Example:**
+
+```
+Input (CDC data):
+  RowKind: -D (DELETE)
+  Data: id=1, name="test1", age=20
+
+Output (Append-Only data):
+  RowKind: +I (INSERT)
+  Data: id=1, name="test1", age=20, row_kind="DELETE"
+```
+
+**Typical Use Cases:**
+- Writing CDC data to data lakes that only support Append mode
+- Preserving complete change history in data warehouses
+- Performing statistical analysis on different types of changes
 
 ## Options
 
-| name              | type   | required | default value |
-|-------------------|--------|----------|---------------|
-| custom_field_name | string | yes      | row_kind      |
-| transform_type    | enum   | yes      | SHORT         |
+| name              | type   | required | default value | description |
+|-------------------|--------|----------|---------------|-------------|
+| custom_field_name | string | no       | row_kind      | The name of the new 
field used to store the original RowKind information |
+| transform_type    | enum   | no       | SHORT         | The output format of 
RowKind, options: SHORT (short format) or FULL (full format) |
 
 ### custom_field_name [string]
 
-Custom field name of the RowKind field 
+Specifies the name of the new field that will store the original RowKind 
information.
+
+**Default value:** `row_kind`
+
+**Notes:**
+- The field name cannot duplicate existing field names, otherwise an error 
will be thrown
+- It's recommended to use meaningful names, such as `operation_type`, 
`change_type`, `cdc_op`, etc.
+
+**Example:**
+```hocon
+custom_field_name = "operation_type"  # Use custom field name
+```
 
 ### transform_type [enum]
 
-the RowKind field value formatting , the option can be `SHORT` or `FULL`
+Specifies the output format of the RowKind field value.
+
+**Available options:**
+
+| Format | Description | Output Values |
+|--------|-------------|---------------|
+| SHORT | Short format (symbol representation) | `+I`, `-U`, `+U`, `-D` |
+| FULL | Full format (English names) | `INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE` |
+
+**Default value:** `SHORT`
+
+**Meaning of each value:**
+
+| RowKind Type | SHORT Format | FULL Format | Description |
+|--------------|--------------|-------------|-------------|
+| INSERT | +I | INSERT | Insert operation |
+| UPDATE_BEFORE | -U | UPDATE_BEFORE | Value before update |
+| UPDATE_AFTER | +U | UPDATE_AFTER | Value after update |
+| DELETE | -D | DELETE | Delete operation |
+
+**Selection Recommendations:**
+- **SHORT format**: Saves storage space, suitable for storage-sensitive 
scenarios
+- **FULL format**: Better readability, suitable for scenarios requiring manual 
review or analysis
+
+**Example:**
+```hocon
+transform_type = FULL  # Use full format
+```
+
+## Complete Examples
+
+### Example 1: Using Default Configuration (SHORT Format)
+
+Using default configuration to convert CDC data to Append-Only mode, with 
RowKind saved in short format.
+
+```yaml
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  MySQL-CDC {
+    plugin_output = "cdc_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["mydb.users"]
+    url = "jdbc:mysql://localhost:3306/mydb"
+  }
+}
+
+transform {
+  RowKindExtractor {
+    plugin_input = "cdc_source"
+    plugin_output = "append_only_data"
+    # Using default configuration:
+    # custom_field_name = "row_kind"
+    # transform_type = SHORT
+  }
+}
+
+sink {
+  Console {
+    plugin_input = "append_only_data"
+  }
+}
+```
+
+**Data Transformation Process:**
+
+```
+Input data (CDC format):
+  1. RowKind=+I, id=1, name="John", age=25
+  2. RowKind=-U, id=1, name="John", age=25
+  3. RowKind=+U, id=1, name="John", age=26
+  4. RowKind=-D, id=1, name="John", age=26
+
+Output data (Append-Only format):
+  1. RowKind=+I, id=1, name="John", age=25, row_kind="+I"
+  2. RowKind=+I, id=1, name="John", age=25, row_kind="-U"
+  3. RowKind=+I, id=1, name="John", age=26, row_kind="+U"
+  4. RowKind=+I, id=1, name="John", age=26, row_kind="-D"
+```
 
-`SHORT` : +I, -U , +U, -D
-`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+---
 
-## Examples
+### Example 2: Using FULL Format with Custom Field Name
 
+Using full format to output RowKind with a custom field name.
 
 ```yaml
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  MySQL-CDC {
+    plugin_output = "cdc_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["mydb.orders"]
+    url = "jdbc:mysql://localhost:3306/mydb"
+  }
+}
+
+transform {
+  RowKindExtractor {
+    plugin_input = "cdc_source"
+    plugin_output = "append_only_data"
+    custom_field_name = "operation_type"  # Custom field name
+    transform_type = FULL                 # Use full format
+  }
+}
+
+sink {
+  Iceberg {
+    plugin_input = "append_only_data"
+    catalog_name = "iceberg_catalog"
+    database = "mydb"
+    table = "orders_history"
+    # Iceberg table will contain operation_type field, recording the change 
type of each data row
+  }
+}
+```
+
+**Data Transformation Process:**
+
+```
+Input data (CDC format):
+  1. RowKind=+I, order_id=1001, amount=100.00
+  2. RowKind=-U, order_id=1001, amount=100.00
+  3. RowKind=+U, order_id=1001, amount=150.00
+  4. RowKind=-D, order_id=1001, amount=150.00
+
+Output data (Append-Only format, FULL format):
+  1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
+  2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
+  3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
+  4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
+```
+
+---
 
+### Example 3: Complete Test Example (Using FakeSource)
+
+Using FakeSource to generate test data, demonstrating the transformation 
effects of various RowKinds.
+
+```yaml
 env {
-    parallelism = 1
-    job.mode = "BATCH"
+  parallelism = 1
+  job.mode = "BATCH"
 }
 
 source {
-    FakeSource {
-        schema = {
-            fields {
-                pk_id = bigint
-                name = string
-                score = int
-            }
-            primaryKey {
-                name = "pk_id"
-                columnNames = [pk_id]
-            }
-        }
-        rows = [
-            {
-                kind = INSERT
-                fields = [1, "A", 100]
-            },
-            {
-                kind = INSERT
-                fields = [2, "B", 100]
-            },
-            {
-                kind = INSERT
-                fields = [3, "C", 100]
-            },
-            {
-                kind = INSERT
-                fields = [4, "D", 100]
-            },
-            {
-                kind = UPDATE_BEFORE
-                fields = [1, "A", 100]
-            },
-            {
-                kind = UPDATE_AFTER
-                fields = [1, "F", 100]
-            }
-            {
-                kind = UPDATE_BEFORE
-                fields = [2, "B", 100]
-            },
-            {
-                kind = UPDATE_AFTER
-                fields = [2, "G", 100]
-            },
-            {
-                kind = DELETE
-                fields = [3, "C", 100]
-            },
-            {
-                kind = DELETE
-                fields = [4, "D", 100]
-            }
-        ]
+  FakeSource {
+    plugin_output = "fake_cdc_data"
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
     }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_updated", 95]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [2, "B", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [2, "B_updated", 98]
+      },
+      {
+        kind = DELETE
+        fields = [1, "A_updated", 95]
+      }
+    ]
+  }
 }
 
 transform {
   RowKindExtractor {
-        custom_field_name = "custom_name"
-        transform_type = FULL
-        plugin_output = "trans_result"
-    }
+    plugin_input = "fake_cdc_data"
+    plugin_output = "transformed_data"
+    custom_field_name = "change_type"
+    transform_type = FULL
+  }
 }
 
 sink {
   Console {
-    plugin_input = "custom_name"
+    plugin_input = "transformed_data"
   }
 }
-
 ```
 
+**Expected Output:**
+
+```
++I, pk_id=1, name="A", score=100, change_type="INSERT"
++I, pk_id=2, name="B", score=100, change_type="INSERT"
++I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
++I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
++I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
+```
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index 72ebd320f8..4a714901a7 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -1,85 +1,173 @@
 # Metadata
 
-> Metadata transform plugin
+> Metadata 转换插件
 
-## Description
-元数据转换插件,用于将元数据字段添加到数据中
+## 描述
 
-## 支持的元数据
+Metadata 转换插件用于将数据行中的元数据信息提取并转换为普通字段,方便后续处理和分析。
 
-|    Key    | DataType |          Description          |
-|:---------:|:--------:|:-----------------------------:|
-| Database  |  string  |           包含该行的数据库名           |
-|   Table   |  string  |           包含该行的数表名            |
-|  RowKind  |  string  |              行类型              |
-| EventTime |   Long   |    该行的对应的数据时间,统一格式是到毫秒的时间戳    |
-|   Delay   |   Long   | 数据抽取时间与数据库变更时间的差,统一格式是到毫秒的时间戳 |
-| Partition |  string  |    包含该行对应数表的分区字段,多个使用`,`连接    |
+**核心功能:**
+- 将元数据(如数据库名、表名、行类型等)提取为可见字段
+- 支持自定义输出字段名称
+- 不改变原有数据字段,只是新增元数据字段
 
-### 注意事项
-    `Delay` `EventTime`目前只适用于cdc系列连接器,TiDB-CDC除外
+**典型应用场景:**
+- CDC 数据同步时需要记录数据来源(库名、表名)
+- 需要追踪数据变更类型(INSERT、UPDATE、DELETE)
+- 需要记录数据的事件时间和延迟信息
+- 多表合并时需要标识数据来源
+
+## 支持的元数据字段
+
+|    元数据Key    | 输出类型 |          说明          | 数据来源 |
+|:---------:|:--------:|:-----------------------------:|:----:|
+| Database  |  string  |  数据所属的数据库名称  | 所有连接器 |
+|   Table   |  string  |  数据所属的表名称  | 所有连接器 |
+|  RowKind  |  string  |  行的变更类型,值为:+I(插入)、-U(更新前)、+U(更新后)、-D(删除)  | 所有连接器 |
+| EventTime |   long   |  数据变更的事件时间戳(毫秒)  | CDC 连接器 |
+|   Delay   |   long   |  数据采集延迟时间(毫秒),即数据抽取时间与数据库变更时间的差值  | CDC 连接器 |
+| Partition |  string  |  数据所属的分区信息,多个分区字段使用逗号分隔  | 支持分区的连接器 |
+
+### 重要说明
+
+1. **元数据字段区分大小写**:配置时必须严格按照上表中的 Key 名称(如 `Database`、`Table`、`RowKind` 等)
+2. **CDC 专有字段**:`EventTime` 和 `Delay` 仅在使用 CDC 连接器时有效(TiDB-CDC 除外)
 
 ## 配置选项
 
-|      name       | type | required | default value | Description       |
+|      参数名       | 类型 | 是否必填 | 默认值 | 说明       |
 |:---------------:|------|:--------:|:-------------:|-------------------|
-| metadata_fields | map  |    是     |       -       | 元数据字段与输入字段相应的映射关系 |
+| metadata_fields | map  |    否     |   空映射   | 元数据字段与输出字段的映射关系,格式为 `元数据Key = 
输出字段名` |
 
 ### metadata_fields [map]
 
-元数据字段和相应的输出字段之间的映射关系
+定义元数据字段到输出字段的映射关系。
 
+**配置格式:**
 ```hocon
 metadata_fields {
-  database = c_database
-  table = c_table
-  rowKind = c_rowKind
-  ts_ms = c_ts_ms
-  delay = c_delay
+  <元数据Key> = <输出字段名>
+  <元数据Key> = <输出字段名>
+  ...
 }
 ```
 
-## 示例
+**配置示例:**
+```hocon
+metadata_fields {
+  Database = source_db      # 将数据库名映射到 source_db 字段
+  Table = source_table      # 将表名映射到 source_table 字段
+  RowKind = op_type         # 将行类型映射到 op_type 字段
+  EventTime = event_ts      # 将事件时间映射到 event_ts 字段
+  Delay = sync_delay        # 将延迟时间映射到 sync_delay 字段
+  Partition = partition_info # 将分区信息映射到 partition_info 字段
+}
+```
 
-```yaml
+**注意事项:**
+- 左侧必须是支持的元数据 Key(见上表),且严格区分大小写
+- 右侧是自定义的输出字段名,不能与原有字段重名
+- 可以只选择需要的元数据字段,不必全部配置
+
+## 完整示例
+
+### 示例 1:MySQL CDC 数据同步,提取所有元数据
 
+从 MySQL 数据库同步数据,并提取所有可用的元数据信息。
+
+```yaml
 env {
-    parallelism = 1
-    job.mode = "STREAMING"
-    checkpoint.interval = 5000
-    read_limit.bytes_per_second = 7000000
-    read_limit.rows_per_second = 400
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
 }
 
 source {
-    MySQL-CDC {
-        plugin_output = "customers_mysql_cdc"
-        server-id = 5652
-        username = "root"
-        password = "zdyk_Dev@2024"
-        table-names = ["source.user"]
-        url = "jdbc:mysql://172.16.17.123:3306/source"
-    }
+  MySQL-CDC {
+    plugin_output = "mysql_cdc_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["mydb.users"]
+    url = "jdbc:mysql://localhost:3306/mydb"
+  }
 }
 
 transform {
   Metadata {
+    plugin_input = "mysql_cdc_source"
+    plugin_output = "metadata_added"
     metadata_fields {
-        Database = database
-        Table = table
-        RowKind = rowKind
-        EventTime = ts_ms
-        Delay = delay
+      Database = source_database    # 提取数据库名
+      Table = source_table          # 提取表名
+      RowKind = change_type         # 提取变更类型
+      EventTime = event_timestamp   # 提取事件时间
+      Delay = sync_delay_ms         # 提取同步延迟
     }
-    plugin_output = "trans_result"
   }
 }
 
 sink {
   Console {
-  plugin_input = "custom_name"
+    plugin_input = "metadata_added"
   }
 }
+```
 
+**输入数据示例:**
 ```
+原始数据行(来自 mydb.users 表):
+id=1, name="张三", age=25
+RowKind: +I (INSERT)
+```
+
+**输出数据示例:**
+```
+转换后的数据行:
+id=1, name="张三", age=25, source_database="mydb", source_table="users",
+change_type="+I", event_timestamp=1699000000000, sync_delay_ms=100
+```
+
+---
 
+### 示例 2:只提取部分元数据
+
+只提取数据来源信息(库名和表名),用于多表合并场景。
+
+```yaml
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  MySQL-CDC {
+    plugin_output = "multi_table_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["db1.orders", "db2.orders"]
+    url = "jdbc:mysql://localhost:3306"
+  }
+}
+
+transform {
+  Metadata {
+    plugin_input = "multi_table_source"
+    plugin_output = "with_source_info"
+    metadata_fields {
+      Database = db_name
+      Table = table_name
+    }
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "with_source_info"
+    url = "jdbc:mysql://localhost:3306/target_db"
+    table = "merged_orders"
+    # 目标表会包含 db_name 和 table_name 字段,用于标识数据来源
+  }
+}
+```
diff --git a/docs/zh/transform-v2/rowkind-extractor.md 
b/docs/zh/transform-v2/rowkind-extractor.md
index cfa4d8fd6c..7813b27f6d 100644
--- a/docs/zh/transform-v2/rowkind-extractor.md
+++ b/docs/zh/transform-v2/rowkind-extractor.md
@@ -1,112 +1,290 @@
 # RowKindExtractor
 
-> RowKindExtractor transform plugin
+> RowKindExtractor 转换插件
 
-## Description
+## 描述
 
-将CDC Row 转换为 Append only Row, 转换后的行扩展了RowKind字段 <br />
-Example: <br />
-CDC row: -D 1, test1, test2 <br />
-transformed Row: +I 1,test1,test2,DELETE
+RowKindExtractor 转换插件用于将 CDC(Change Data Capture)数据流转换为 
Append-Only(仅追加)模式,同时将原始的 RowKind 信息提取为一个新的字段。
 
-## Options
+**核心功能:**
+- 将所有数据行的 RowKind 统一改为 `+I`(INSERT),实现 Append-Only 模式
+- 将原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
+- 支持短格式和完整格式两种输出方式
 
-| name              | type   | required | default value |
-|-------------------|--------|----------|---------------|
-| custom_field_name | string | yes      | row_kind      |
-| transform_type    | enum   | yes      | SHORT         |
+**为什么需要这个插件?**
+
+在 CDC 数据同步场景中,数据行带有 RowKind 标记(+I、-U、+U、-D),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持 
Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此时需要:
+1. 将所有数据转换为 INSERT 类型(Append-Only)
+2. 将原始的变更类型保存为普通字段,供后续分析使用
+
+**转换示例:**
+
+```
+输入(CDC 数据):
+  RowKind: -D (DELETE)
+  数据: id=1, name="test1", age=20
+
+输出(Append-Only 数据):
+  RowKind: +I (INSERT)
+  数据: id=1, name="test1", age=20, row_kind="DELETE"
+```
+
+**典型应用场景:**
+- 将 CDC 数据写入只支持 Append 的数据湖
+- 需要在数据仓库中保留完整的变更历史记录
+- 需要对不同类型的变更进行统计分析
+
+## 配置选项
+
+| 参数名              | 类型   | 是否必填 | 默认值 | 说明 |
+|-------------------|--------|----------|---------------|------|
+| custom_field_name | string | 否      | row_kind      | 新增字段的名称,用于存储原始的 
RowKind 信息 |
+| transform_type    | enum   | 否      | SHORT         | RowKind 
的输出格式,可选值:SHORT(短格式)或 FULL(完整格式) |
 
 ### custom_field_name [string]
 
-RowKind列的自定义名
+指定新增字段的名称,该字段用于存储原始的 RowKind 信息。
+
+**默认值:** `row_kind`
+
+**注意事项:**
+- 字段名不能与原有字段重名,否则会报错
+- 建议使用有意义的名称,如 `operation_type`、`change_type`、`cdc_op` 等
+
+**示例:**
+```hocon
+custom_field_name = "operation_type"  # 使用自定义字段名
+```
 
 ### transform_type [enum]
 
-格式化RowKind值 , 配置为 `SHORT` 或 `FULL`
+指定 RowKind 字段值的输出格式。
+
+**可选值:**
+
+| 格式 | 说明 | 输出值 |
+|------|------|--------|
+| SHORT | 短格式(符号表示) | `+I`、`-U`、`+U`、`-D` |
+| FULL | 完整格式(英文名称) | `INSERT`、`UPDATE_BEFORE`、`UPDATE_AFTER`、`DELETE` |
+
+**默认值:** `SHORT`
+
+**各值含义:**
+
+| RowKind 类型 | SHORT 格式 | FULL 格式 | 说明    |
+|-------------|-----------|----------|-------|
+| INSERT | +I | INSERT | 插入操作  |
+| UPDATE_BEFORE | -U | UPDATE_BEFORE | 更新前的值 |
+| UPDATE_AFTER | +U | UPDATE_AFTER | 更新后的值 |
+| DELETE | -D | DELETE | 删除操作  |
+
+**选择建议:**
+- **SHORT 格式**:节省存储空间,适合对存储敏感的场景
+- **FULL 格式**:可读性更好,适合需要人工查看或分析的场景
+
+**示例:**
+```hocon
+transform_type = FULL  # 使用完整格式
+```
 
-`SHORT` : +I, -U , +U, -D
-`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+## 完整示例
 
-## Examples
+### 示例 1:使用默认配置(SHORT 格式)
+
+使用默认配置,将 CDC 数据转换为 Append-Only 模式,RowKind 以短格式保存。
 
 ```yaml
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  MySQL-CDC {
+    plugin_output = "cdc_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["mydb.users"]
+    url = "jdbc:mysql://localhost:3306/mydb"
+  }
+}
+
+transform {
+  RowKindExtractor {
+    plugin_input = "cdc_source"
+    plugin_output = "append_only_data"
+    # 使用默认配置:
+    # custom_field_name = "row_kind"
+    # transform_type = SHORT
+  }
+}
+
+sink {
+  Console {
+    plugin_input = "append_only_data"
+  }
+}
+```
 
+**数据转换过程:**
+
+```
+输入数据(CDC 格式):
+  1. RowKind=+I, id=1, name="张三", age=25
+  2. RowKind=-U, id=1, name="张三", age=25
+  3. RowKind=+U, id=1, name="张三", age=26
+  4. RowKind=-D, id=1, name="张三", age=26
+
+输出数据(Append-Only 格式):
+  1. RowKind=+I, id=1, name="张三", age=25, row_kind="+I"
+  2. RowKind=+I, id=1, name="张三", age=25, row_kind="-U"
+  3. RowKind=+I, id=1, name="张三", age=26, row_kind="+U"
+  4. RowKind=+I, id=1, name="张三", age=26, row_kind="-D"
+```
+
+---
+
+### 示例 2:使用 FULL 格式和自定义字段名
+
+使用完整格式输出 RowKind,并自定义字段名称。
+
+```yaml
 env {
-    parallelism = 1
-    job.mode = "BATCH"
+  parallelism = 1
+  job.mode = "STREAMING"
 }
 
 source {
-    FakeSource {
-        schema = {
-            fields {
-                pk_id = bigint
-                name = string
-                score = int
-            }
-            primaryKey {
-                name = "pk_id"
-                columnNames = [pk_id]
-            }
-        }
-        rows = [
-            {
-                kind = INSERT
-                fields = [1, "A", 100]
-            },
-            {
-                kind = INSERT
-                fields = [2, "B", 100]
-            },
-            {
-                kind = INSERT
-                fields = [3, "C", 100]
-            },
-            {
-                kind = INSERT
-                fields = [4, "D", 100]
-            },
-            {
-                kind = UPDATE_BEFORE
-                fields = [1, "A", 100]
-            },
-            {
-                kind = UPDATE_AFTER
-                fields = [1, "F", 100]
-            }
-            {
-                kind = UPDATE_BEFORE
-                fields = [2, "B", 100]
-            },
-            {
-                kind = UPDATE_AFTER
-                fields = [2, "G", 100]
-            },
-            {
-                kind = DELETE
-                fields = [3, "C", 100]
-            },
-            {
-                kind = DELETE
-                fields = [4, "D", 100]
-            }
-        ]
-    }
+  MySQL-CDC {
+    plugin_output = "cdc_source"
+    server-id = 5652
+    username = "root"
+    password = "your_password"
+    table-names = ["mydb.orders"]
+    url = "jdbc:mysql://localhost:3306/mydb"
+  }
 }
 
 transform {
   RowKindExtractor {
-        custom_field_name = "custom_name"
-        transform_type = FULL
-        plugin_output = "trans_result"
+    plugin_input = "cdc_source"
+    plugin_output = "append_only_data"
+    custom_field_name = "operation_type"  # 自定义字段名
+    transform_type = FULL                 # 使用完整格式
+  }
+}
+
+sink {
+  Iceberg {
+    plugin_input = "append_only_data"
+    catalog_name = "iceberg_catalog"
+    database = "mydb"
+    table = "orders_history"
+    # Iceberg 表会包含 operation_type 字段,记录每条数据的变更类型
+  }
+}
+```
+
+**数据转换过程:**
+
+```
+输入数据(CDC 格式):
+  1. RowKind=+I, order_id=1001, amount=100.00
+  2. RowKind=-U, order_id=1001, amount=100.00
+  3. RowKind=+U, order_id=1001, amount=150.00
+  4. RowKind=-D, order_id=1001, amount=150.00
+
+输出数据(Append-Only 格式,FULL 格式):
+  1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
+  2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
+  3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
+  4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
+```
+
+---
+
+### 示例 3:完整的测试示例(使用 FakeSource)
+
+使用 FakeSource 生成测试数据,演示各种 RowKind 的转换效果。
+
+```yaml
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake_cdc_data"
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
     }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_updated", 95]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [2, "B", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [2, "B_updated", 98]
+      },
+      {
+        kind = DELETE
+        fields = [1, "A_updated", 95]
+      }
+    ]
+  }
+}
+
+transform {
+  RowKindExtractor {
+    plugin_input = "fake_cdc_data"
+    plugin_output = "transformed_data"
+    custom_field_name = "change_type"
+    transform_type = FULL
+  }
 }
 
 sink {
   Console {
-    plugin_input = "custom_name"
+    plugin_input = "transformed_data"
   }
 }
-
 ```
 
+**预期输出:**
+
+```
++I, pk_id=1, name="A", score=100, change_type="INSERT"
++I, pk_id=2, name="B", score=100, change_type="INSERT"
++I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
++I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
++I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
+```


Reply via email to