This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4042767b94 [doc][transforms-v2] improve meatadata and rowkind doc
(#10010)
4042767b94 is described below
commit 4042767b94ef861f7810e255a24e53c1cc5360f3
Author: Jast <[email protected]>
AuthorDate: Thu Nov 6 22:47:43 2025 +0800
[doc][transforms-v2] improve meatadata and rowkind doc (#10010)
---
docs/en/transform-v2/metadata.md | 174 ++++++++++++----
docs/en/transform-v2/rowkind-extractor.md | 327 ++++++++++++++++++++++-------
docs/zh/transform-v2/metadata.md | 176 ++++++++++++----
docs/zh/transform-v2/rowkind-extractor.md | 334 +++++++++++++++++++++++-------
4 files changed, 771 insertions(+), 240 deletions(-)
diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index 89c0bea651..be4be03c17 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -3,83 +3,171 @@
> Metadata transform plugin
## Description
-Metadata transform plugin for adding metadata fields to data
-## Available Metadata
+The Metadata transform plugin is used to extract metadata information from
data rows and convert it into regular fields for subsequent processing and
analysis.
-| Key | DataType | Description
|
-|:---------:|:--------:|:---------------------------------------------------------------------------------------------------------|
-| Database | string | Name of the table that contain the row.
|
-| Table | string | Name of the table that contain the row.
|
-| RowKind | string | The type of operation
|
-| EventTime | Long | The time at which the connector processed the
event.And the data should be milliseconds |
-| Delay | Long | The difference between data extraction time and
database change time.And the data should be milliseconds |
-| Partition | string | Contains the partition field of the corresponding
number table of the row, multiple using `,` join |
+**Core Features:**
+- Extracts metadata (such as database name, table name, row type, etc.) as
visible fields
+- Supports custom output field names
+- Does not modify original data fields, only adds metadata fields
-### note
- `Delay` `EventTime` only worked on cdc series connectors for now , except
TiDB-CDC
+**Typical Use Cases:**
+- Recording data source (database name, table name) during CDC data
synchronization
+- Tracking data change types (INSERT, UPDATE, DELETE)
+- Recording event time and delay information of data
+- Identifying data sources when merging multiple tables
+
+## Supported Metadata Fields
+
+| Metadata Key | Output Type | Description | Data
Source |
+|:---------:|:--------:|:-----------------------------:|:----:|
+| Database | string | Name of the database containing the data | All
connectors |
+| Table | string | Name of the table containing the data | All
connectors |
+| RowKind | string | Row change type, values: +I (insert), -U (update
before), +U (update after), -D (delete) | All connectors |
+| EventTime | long | Event timestamp of data change (milliseconds) | CDC
connectors |
+| Delay | long | Data collection delay time (milliseconds), i.e., the
difference between data extraction time and database change time | CDC
connectors |
+| Partition | string | Partition information of the data, multiple
partition fields separated by commas | Connectors supporting partitions |
+
+### Important Notes
+
+1. **Metadata field names are case-sensitive**: Configuration must strictly
follow the Key names in the table above (e.g., `Database`, `Table`, `RowKind`,
etc.)
+2. **CDC-specific fields**: `EventTime` and `Delay` are only valid when using
CDC connectors (except TiDB-CDC)
## Options
-| name | type | required | default value | Description
|
-|:---------------:|------|----------|---------------|---------------------------------------------------------------------------|
-| metadata_fields | map | yes | | A mapping metadata input
fields and their corresponding output fields. |
+| name | type | required | default value | description |
+|:---------------:|------|:--------:|:-------------:|-------------------|
+| metadata_fields | map | no | empty map | Mapping relationship
between metadata fields and output fields, format: `Metadata Key = output field
name` |
### metadata_fields [map]
-A mapping between metadata fields and their respective output fields.
+Defines the mapping relationship between metadata fields and output fields.
+**Configuration Format:**
```hocon
metadata_fields {
- Database = c_database
- Table = c_table
- RowKind = c_rowKind
- EventTime = c_ts_ms
- Delay = c_delay
+ <Metadata Key> = <output field name>
+ <Metadata Key> = <output field name>
+ ...
}
```
-## Examples
+**Configuration Example:**
+```hocon
+metadata_fields {
+ Database = source_db # Map database name to source_db field
+ Table = source_table # Map table name to source_table field
+ RowKind = op_type # Map row type to op_type field
+ EventTime = event_ts # Map event time to event_ts field
+ Delay = sync_delay # Map delay time to sync_delay field
+ Partition = partition_info # Map partition info to partition_info field
+}
+```
-```yaml
+**Notes:**
+- The left side must be a supported metadata Key (see table above), and is
strictly case-sensitive
+- The right side is a custom output field name, which cannot duplicate
existing field names
+- You can select only the metadata fields you need, not all of them must be
configured
+
+## Complete Examples
+
+### Example 1: MySQL CDC Data Synchronization, Extracting All Metadata
+Synchronizing data from MySQL database and extracting all available metadata
information.
+
+```yaml
env {
- parallelism = 1
- job.mode = "STREAMING"
- checkpoint.interval = 5000
- read_limit.bytes_per_second = 7000000
- read_limit.rows_per_second = 400
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
}
source {
- MySQL-CDC {
- plugin_output = "customers_mysql_cdc"
- server-id = 5652
- username = "root"
- password = "zdyk_Dev@2024"
- table-names = ["source.user"]
- url = "jdbc:mysql://172.16.17.123:3306/source"
- }
+ MySQL-CDC {
+ plugin_output = "mysql_cdc_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["mydb.users"]
+ url = "jdbc:mysql://localhost:3306/mydb"
+ }
}
transform {
Metadata {
+ plugin_input = "mysql_cdc_source"
+ plugin_output = "metadata_added"
metadata_fields {
- Database = database
- Table = table
- RowKind = rowKind
- EventTime = ts_ms
- Delay = delay
+ Database = source_database # Extract database name
+ Table = source_table # Extract table name
+ RowKind = change_type # Extract change type
+ EventTime = event_timestamp # Extract event time
+ Delay = sync_delay_ms # Extract sync delay
}
- plugin_output = "trans_result"
}
}
sink {
Console {
- plugin_input = "custom_name"
+ plugin_input = "metadata_added"
}
}
+```
+**Input Data Example:**
```
+Original data row (from mydb.users table):
+id=1, name="John", age=25
+RowKind: +I (INSERT)
+```
+
+**Output Data Example:**
+```
+Transformed data row:
+id=1, name="John", age=25, source_database="mydb", source_table="users",
+change_type="+I", event_timestamp=1699000000000, sync_delay_ms=100
+```
+
+---
+### Example 2: Extracting Only Partial Metadata
+
+Extracting only data source information (database name and table name) for
multi-table merge scenarios.
+
+```yaml
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ MySQL-CDC {
+ plugin_output = "multi_table_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["db1.orders", "db2.orders"]
+ url = "jdbc:mysql://localhost:3306"
+ }
+}
+
+transform {
+ Metadata {
+ plugin_input = "multi_table_source"
+ plugin_output = "with_source_info"
+ metadata_fields {
+ Database = db_name
+ Table = table_name
+ }
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "with_source_info"
+ url = "jdbc:mysql://localhost:3306/target_db"
+ table = "merged_orders"
+ # Target table will contain db_name and table_name fields to identify data
source
+ }
+}
+```
diff --git a/docs/en/transform-v2/rowkind-extractor.md
b/docs/en/transform-v2/rowkind-extractor.md
index a2ee384c34..214b215311 100644
--- a/docs/en/transform-v2/rowkind-extractor.md
+++ b/docs/en/transform-v2/rowkind-extractor.md
@@ -4,110 +4,287 @@
## Description
-transform cdc row to append only row that contains the cdc RowKind. <br />
-Example: <br />
-CDC row: -D 1, test1, test2 <br />
-transformed Row: +I 1,test1,test2,DELETE
+The RowKindExtractor transform plugin is used to convert CDC (Change Data
Capture) data streams into Append-Only mode while extracting the original
RowKind information as a new field.
+
+**Core Features:**
+- Converts all data rows' RowKind to `+I` (INSERT), achieving Append-Only mode
+- Saves the original RowKind information (INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE) to a newly added field
+- Supports both short format and full format output
+
+**Why is this plugin needed?**
+
+In CDC data synchronization scenarios, data rows carry RowKind markers (+I,
-U, +U, -D) representing different change types. However, some downstream
systems (such as data lakes, analytical systems) only support Append-Only mode
and do not support UPDATE and DELETE operations. In such cases, you need to:
+1. Convert all data to INSERT type (Append-Only)
+2. Save the original change type as a regular field for subsequent analysis
+
+**Transformation Example:**
+
+```
+Input (CDC data):
+ RowKind: -D (DELETE)
+ Data: id=1, name="test1", age=20
+
+Output (Append-Only data):
+ RowKind: +I (INSERT)
+ Data: id=1, name="test1", age=20, row_kind="DELETE"
+```
+
+**Typical Use Cases:**
+- Writing CDC data to data lakes that only support Append mode
+- Preserving complete change history in data warehouses
+- Performing statistical analysis on different types of changes
## Options
-| name | type | required | default value |
-|-------------------|--------|----------|---------------|
-| custom_field_name | string | yes | row_kind |
-| transform_type | enum | yes | SHORT |
+| name | type | required | default value | description |
+|-------------------|--------|----------|---------------|-------------|
+| custom_field_name | string | no | row_kind | The name of the new
field used to store the original RowKind information |
+| transform_type | enum | no | SHORT | The output format of
RowKind, options: SHORT (short format) or FULL (full format) |
### custom_field_name [string]
-Custom field name of the RowKind field
+Specifies the name of the new field that will store the original RowKind
information.
+
+**Default value:** `row_kind`
+
+**Notes:**
+- The field name cannot duplicate existing field names, otherwise an error
will be thrown
+- It's recommended to use meaningful names, such as `operation_type`,
`change_type`, `cdc_op`, etc.
+
+**Example:**
+```hocon
+custom_field_name = "operation_type" # Use custom field name
+```
### transform_type [enum]
-the RowKind field value formatting , the option can be `SHORT` or `FULL`
+Specifies the output format of the RowKind field value.
+
+**Available options:**
+
+| Format | Description | Output Values |
+|--------|-------------|---------------|
+| SHORT | Short format (symbol representation) | `+I`, `-U`, `+U`, `-D` |
+| FULL | Full format (English names) | `INSERT`, `UPDATE_BEFORE`,
`UPDATE_AFTER`, `DELETE` |
+
+**Default value:** `SHORT`
+
+**Meaning of each value:**
+
+| RowKind Type | SHORT Format | FULL Format | Description |
+|--------------|--------------|-------------|-------------|
+| INSERT | +I | INSERT | Insert operation |
+| UPDATE_BEFORE | -U | UPDATE_BEFORE | Value before update |
+| UPDATE_AFTER | +U | UPDATE_AFTER | Value after update |
+| DELETE | -D | DELETE | Delete operation |
+
+**Selection Recommendations:**
+- **SHORT format**: Saves storage space, suitable for storage-sensitive
scenarios
+- **FULL format**: Better readability, suitable for scenarios requiring manual
review or analysis
+
+**Example:**
+```hocon
+transform_type = FULL # Use full format
+```
+
+## Complete Examples
+
+### Example 1: Using Default Configuration (SHORT Format)
+
+Using default configuration to convert CDC data to Append-Only mode, with
RowKind saved in short format.
+
+```yaml
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ MySQL-CDC {
+ plugin_output = "cdc_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["mydb.users"]
+ url = "jdbc:mysql://localhost:3306/mydb"
+ }
+}
+
+transform {
+ RowKindExtractor {
+ plugin_input = "cdc_source"
+ plugin_output = "append_only_data"
+ # Using default configuration:
+ # custom_field_name = "row_kind"
+ # transform_type = SHORT
+ }
+}
+
+sink {
+ Console {
+ plugin_input = "append_only_data"
+ }
+}
+```
+
+**Data Transformation Process:**
+
+```
+Input data (CDC format):
+ 1. RowKind=+I, id=1, name="John", age=25
+ 2. RowKind=-U, id=1, name="John", age=25
+ 3. RowKind=+U, id=1, name="John", age=26
+ 4. RowKind=-D, id=1, name="John", age=26
+
+Output data (Append-Only format):
+ 1. RowKind=+I, id=1, name="John", age=25, row_kind="+I"
+ 2. RowKind=+I, id=1, name="John", age=25, row_kind="-U"
+ 3. RowKind=+I, id=1, name="John", age=26, row_kind="+U"
+ 4. RowKind=+I, id=1, name="John", age=26, row_kind="-D"
+```
-`SHORT` : +I, -U , +U, -D
-`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+---
-## Examples
+### Example 2: Using FULL Format with Custom Field Name
+Using full format to output RowKind with a custom field name.
```yaml
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ MySQL-CDC {
+ plugin_output = "cdc_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["mydb.orders"]
+ url = "jdbc:mysql://localhost:3306/mydb"
+ }
+}
+
+transform {
+ RowKindExtractor {
+ plugin_input = "cdc_source"
+ plugin_output = "append_only_data"
+ custom_field_name = "operation_type" # Custom field name
+ transform_type = FULL # Use full format
+ }
+}
+
+sink {
+ Iceberg {
+ plugin_input = "append_only_data"
+ catalog_name = "iceberg_catalog"
+ database = "mydb"
+ table = "orders_history"
+ # Iceberg table will contain operation_type field, recording the change
type of each data row
+ }
+}
+```
+
+**Data Transformation Process:**
+
+```
+Input data (CDC format):
+ 1. RowKind=+I, order_id=1001, amount=100.00
+ 2. RowKind=-U, order_id=1001, amount=100.00
+ 3. RowKind=+U, order_id=1001, amount=150.00
+ 4. RowKind=-D, order_id=1001, amount=150.00
+
+Output data (Append-Only format, FULL format):
+ 1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
+ 2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
+ 3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
+ 4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
+```
+
+---
+### Example 3: Complete Test Example (Using FakeSource)
+
+Using FakeSource to generate test data, demonstrating the transformation
effects of various RowKinds.
+
+```yaml
env {
- parallelism = 1
- job.mode = "BATCH"
+ parallelism = 1
+ job.mode = "BATCH"
}
source {
- FakeSource {
- schema = {
- fields {
- pk_id = bigint
- name = string
- score = int
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, "A", 100]
- },
- {
- kind = INSERT
- fields = [2, "B", 100]
- },
- {
- kind = INSERT
- fields = [3, "C", 100]
- },
- {
- kind = INSERT
- fields = [4, "D", 100]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, "A", 100]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, "F", 100]
- }
- {
- kind = UPDATE_BEFORE
- fields = [2, "B", 100]
- },
- {
- kind = UPDATE_AFTER
- fields = [2, "G", 100]
- },
- {
- kind = DELETE
- fields = [3, "C", 100]
- },
- {
- kind = DELETE
- fields = [4, "D", 100]
- }
- ]
+ FakeSource {
+ plugin_output = "fake_cdc_data"
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
}
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_updated", 95]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "B_updated", 98]
+ },
+ {
+ kind = DELETE
+ fields = [1, "A_updated", 95]
+ }
+ ]
+ }
}
transform {
RowKindExtractor {
- custom_field_name = "custom_name"
- transform_type = FULL
- plugin_output = "trans_result"
- }
+ plugin_input = "fake_cdc_data"
+ plugin_output = "transformed_data"
+ custom_field_name = "change_type"
+ transform_type = FULL
+ }
}
sink {
Console {
- plugin_input = "custom_name"
+ plugin_input = "transformed_data"
}
}
-
```
+**Expected Output:**
+
+```
++I, pk_id=1, name="A", score=100, change_type="INSERT"
++I, pk_id=2, name="B", score=100, change_type="INSERT"
++I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
++I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
++I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
+```
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index 72ebd320f8..4a714901a7 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -1,85 +1,173 @@
# Metadata
-> Metadata transform plugin
+> Metadata 转换插件
-## Description
-元数据转换插件,用于将元数据字段添加到数据中
+## 描述
-## 支持的元数据
+Metadata 转换插件用于将数据行中的元数据信息提取并转换为普通字段,方便后续处理和分析。
-| Key | DataType | Description |
-|:---------:|:--------:|:-----------------------------:|
-| Database | string | 包含该行的数据库名 |
-| Table | string | 包含该行的数表名 |
-| RowKind | string | 行类型 |
-| EventTime | Long | 该行的对应的数据时间,统一格式是到毫秒的时间戳 |
-| Delay | Long | 数据抽取时间与数据库变更时间的差,统一格式是到毫秒的时间戳 |
-| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
+**核心功能:**
+- 将元数据(如数据库名、表名、行类型等)提取为可见字段
+- 支持自定义输出字段名称
+- 不改变原有数据字段,只是新增元数据字段
-### 注意事项
- `Delay` `EventTime`目前只适用于cdc系列连接器,TiDB-CDC除外
+**典型应用场景:**
+- CDC 数据同步时需要记录数据来源(库名、表名)
+- 需要追踪数据变更类型(INSERT、UPDATE、DELETE)
+- 需要记录数据的事件时间和延迟信息
+- 多表合并时需要标识数据来源
+
+## 支持的元数据字段
+
+| 元数据Key | 输出类型 | 说明 | 数据来源 |
+|:---------:|:--------:|:-----------------------------:|:----:|
+| Database | string | 数据所属的数据库名称 | 所有连接器 |
+| Table | string | 数据所属的表名称 | 所有连接器 |
+| RowKind | string | 行的变更类型,值为:+I(插入)、-U(更新前)、+U(更新后)、-D(删除) | 所有连接器 |
+| EventTime | long | 数据变更的事件时间戳(毫秒) | CDC 连接器 |
+| Delay | long | 数据采集延迟时间(毫秒),即数据抽取时间与数据库变更时间的差值 | CDC 连接器 |
+| Partition | string | 数据所属的分区信息,多个分区字段使用逗号分隔 | 支持分区的连接器 |
+
+### 重要说明
+
+1. **元数据字段区分大小写**:配置时必须严格按照上表中的 Key 名称(如 `Database`、`Table`、`RowKind` 等)
+2. **CDC 专有字段**:`EventTime` 和 `Delay` 仅在使用 CDC 连接器时有效(TiDB-CDC 除外)
## 配置选项
-| name | type | required | default value | Description |
+| 参数名 | 类型 | 是否必填 | 默认值 | 说明 |
|:---------------:|------|:--------:|:-------------:|-------------------|
-| metadata_fields | map | 是 | - | 元数据字段与输入字段相应的映射关系 |
+| metadata_fields | map | 否 | 空映射 | 元数据字段与输出字段的映射关系,格式为 `元数据Key =
输出字段名` |
### metadata_fields [map]
-元数据字段和相应的输出字段之间的映射关系
+定义元数据字段到输出字段的映射关系。
+**配置格式:**
```hocon
metadata_fields {
- database = c_database
- table = c_table
- rowKind = c_rowKind
- ts_ms = c_ts_ms
- delay = c_delay
+ <元数据Key> = <输出字段名>
+ <元数据Key> = <输出字段名>
+ ...
}
```
-## 示例
+**配置示例:**
+```hocon
+metadata_fields {
+ Database = source_db # 将数据库名映射到 source_db 字段
+ Table = source_table # 将表名映射到 source_table 字段
+ RowKind = op_type # 将行类型映射到 op_type 字段
+ EventTime = event_ts # 将事件时间映射到 event_ts 字段
+ Delay = sync_delay # 将延迟时间映射到 sync_delay 字段
+ Partition = partition_info # 将分区信息映射到 partition_info 字段
+}
+```
-```yaml
+**注意事项:**
+- 左侧必须是支持的元数据 Key(见上表),且严格区分大小写
+- 右侧是自定义的输出字段名,不能与原有字段重名
+- 可以只选择需要的元数据字段,不必全部配置
+
+## 完整示例
+
+### 示例 1:MySQL CDC 数据同步,提取所有元数据
+从 MySQL 数据库同步数据,并提取所有可用的元数据信息。
+
+```yaml
env {
- parallelism = 1
- job.mode = "STREAMING"
- checkpoint.interval = 5000
- read_limit.bytes_per_second = 7000000
- read_limit.rows_per_second = 400
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
}
source {
- MySQL-CDC {
- plugin_output = "customers_mysql_cdc"
- server-id = 5652
- username = "root"
- password = "zdyk_Dev@2024"
- table-names = ["source.user"]
- url = "jdbc:mysql://172.16.17.123:3306/source"
- }
+ MySQL-CDC {
+ plugin_output = "mysql_cdc_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["mydb.users"]
+ url = "jdbc:mysql://localhost:3306/mydb"
+ }
}
transform {
Metadata {
+ plugin_input = "mysql_cdc_source"
+ plugin_output = "metadata_added"
metadata_fields {
- Database = database
- Table = table
- RowKind = rowKind
- EventTime = ts_ms
- Delay = delay
+ Database = source_database # 提取数据库名
+ Table = source_table # 提取表名
+ RowKind = change_type # 提取变更类型
+ EventTime = event_timestamp # 提取事件时间
+ Delay = sync_delay_ms # 提取同步延迟
}
- plugin_output = "trans_result"
}
}
sink {
Console {
- plugin_input = "custom_name"
+ plugin_input = "metadata_added"
}
}
+```
+**输入数据示例:**
```
+原始数据行(来自 mydb.users 表):
+id=1, name="张三", age=25
+RowKind: +I (INSERT)
+```
+
+**输出数据示例:**
+```
+转换后的数据行:
+id=1, name="张三", age=25, source_database="mydb", source_table="users",
+change_type="+I", event_timestamp=1699000000000, sync_delay_ms=100
+```
+
+---
+### 示例 2:只提取部分元数据
+
+只提取数据来源信息(库名和表名),用于多表合并场景。
+
+```yaml
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ MySQL-CDC {
+ plugin_output = "multi_table_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["db1.orders", "db2.orders"]
+ url = "jdbc:mysql://localhost:3306"
+ }
+}
+
+transform {
+ Metadata {
+ plugin_input = "multi_table_source"
+ plugin_output = "with_source_info"
+ metadata_fields {
+ Database = db_name
+ Table = table_name
+ }
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "with_source_info"
+ url = "jdbc:mysql://localhost:3306/target_db"
+ table = "merged_orders"
+ # 目标表会包含 db_name 和 table_name 字段,用于标识数据来源
+ }
+}
+```
diff --git a/docs/zh/transform-v2/rowkind-extractor.md
b/docs/zh/transform-v2/rowkind-extractor.md
index cfa4d8fd6c..7813b27f6d 100644
--- a/docs/zh/transform-v2/rowkind-extractor.md
+++ b/docs/zh/transform-v2/rowkind-extractor.md
@@ -1,112 +1,290 @@
# RowKindExtractor
-> RowKindExtractor transform plugin
+> RowKindExtractor 转换插件
-## Description
+## 描述
-将CDC Row 转换为 Append only Row, 转换后的行扩展了RowKind字段 <br />
-Example: <br />
-CDC row: -D 1, test1, test2 <br />
-transformed Row: +I 1,test1,test2,DELETE
+RowKindExtractor 转换插件用于将 CDC(Change Data Capture)数据流转换为
Append-Only(仅追加)模式,同时将原始的 RowKind 信息提取为一个新的字段。
-## Options
+**核心功能:**
+- 将所有数据行的 RowKind 统一改为 `+I`(INSERT),实现 Append-Only 模式
+- 将原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
+- 支持短格式和完整格式两种输出方式
-| name | type | required | default value |
-|-------------------|--------|----------|---------------|
-| custom_field_name | string | yes | row_kind |
-| transform_type | enum | yes | SHORT |
+**为什么需要这个插件?**
+
+在 CDC 数据同步场景中,数据行带有 RowKind 标记(+I、-U、+U、-D),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持
Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此时需要:
+1. 将所有数据转换为 INSERT 类型(Append-Only)
+2. 将原始的变更类型保存为普通字段,供后续分析使用
+
+**转换示例:**
+
+```
+输入(CDC 数据):
+ RowKind: -D (DELETE)
+ 数据: id=1, name="test1", age=20
+
+输出(Append-Only 数据):
+ RowKind: +I (INSERT)
+ 数据: id=1, name="test1", age=20, row_kind="DELETE"
+```
+
+**典型应用场景:**
+- 将 CDC 数据写入只支持 Append 的数据湖
+- 需要在数据仓库中保留完整的变更历史记录
+- 需要对不同类型的变更进行统计分析
+
+## 配置选项
+
+| 参数名 | 类型 | 是否必填 | 默认值 | 说明 |
+|-------------------|--------|----------|---------------|------|
+| custom_field_name | string | 否 | row_kind | 新增字段的名称,用于存储原始的
RowKind 信息 |
+| transform_type | enum | 否 | SHORT | RowKind
的输出格式,可选值:SHORT(短格式)或 FULL(完整格式) |
### custom_field_name [string]
-RowKind列的自定义名
+指定新增字段的名称,该字段用于存储原始的 RowKind 信息。
+
+**默认值:** `row_kind`
+
+**注意事项:**
+- 字段名不能与原有字段重名,否则会报错
+- 建议使用有意义的名称,如 `operation_type`、`change_type`、`cdc_op` 等
+
+**示例:**
+```hocon
+custom_field_name = "operation_type" # 使用自定义字段名
+```
### transform_type [enum]
-格式化RowKind值 , 配置为 `SHORT` 或 `FULL`
+指定 RowKind 字段值的输出格式。
+
+**可选值:**
+
+| 格式 | 说明 | 输出值 |
+|------|------|--------|
+| SHORT | 短格式(符号表示) | `+I`、`-U`、`+U`、`-D` |
+| FULL | 完整格式(英文名称) | `INSERT`、`UPDATE_BEFORE`、`UPDATE_AFTER`、`DELETE` |
+
+**默认值:** `SHORT`
+
+**各值含义:**
+
+| RowKind 类型 | SHORT 格式 | FULL 格式 | 说明 |
+|-------------|-----------|----------|-------|
+| INSERT | +I | INSERT | 插入操作 |
+| UPDATE_BEFORE | -U | UPDATE_BEFORE | 更新前的值 |
+| UPDATE_AFTER | +U | UPDATE_AFTER | 更新后的值 |
+| DELETE | -D | DELETE | 删除操作 |
+
+**选择建议:**
+- **SHORT 格式**:节省存储空间,适合对存储敏感的场景
+- **FULL 格式**:可读性更好,适合需要人工查看或分析的场景
+
+**示例:**
+```hocon
+transform_type = FULL # 使用完整格式
+```
-`SHORT` : +I, -U , +U, -D
-`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+## 完整示例
-## Examples
+### 示例 1:使用默认配置(SHORT 格式)
+
+使用默认配置,将 CDC 数据转换为 Append-Only 模式,RowKind 以短格式保存。
```yaml
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ MySQL-CDC {
+ plugin_output = "cdc_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["mydb.users"]
+ url = "jdbc:mysql://localhost:3306/mydb"
+ }
+}
+
+transform {
+ RowKindExtractor {
+ plugin_input = "cdc_source"
+ plugin_output = "append_only_data"
+ # 使用默认配置:
+ # custom_field_name = "row_kind"
+ # transform_type = SHORT
+ }
+}
+
+sink {
+ Console {
+ plugin_input = "append_only_data"
+ }
+}
+```
+**数据转换过程:**
+
+```
+输入数据(CDC 格式):
+ 1. RowKind=+I, id=1, name="张三", age=25
+ 2. RowKind=-U, id=1, name="张三", age=25
+ 3. RowKind=+U, id=1, name="张三", age=26
+ 4. RowKind=-D, id=1, name="张三", age=26
+
+输出数据(Append-Only 格式):
+ 1. RowKind=+I, id=1, name="张三", age=25, row_kind="+I"
+ 2. RowKind=+I, id=1, name="张三", age=25, row_kind="-U"
+ 3. RowKind=+I, id=1, name="张三", age=26, row_kind="+U"
+ 4. RowKind=+I, id=1, name="张三", age=26, row_kind="-D"
+```
+
+---
+
+### 示例 2:使用 FULL 格式和自定义字段名
+
+使用完整格式输出 RowKind,并自定义字段名称。
+
+```yaml
env {
- parallelism = 1
- job.mode = "BATCH"
+ parallelism = 1
+ job.mode = "STREAMING"
}
source {
- FakeSource {
- schema = {
- fields {
- pk_id = bigint
- name = string
- score = int
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, "A", 100]
- },
- {
- kind = INSERT
- fields = [2, "B", 100]
- },
- {
- kind = INSERT
- fields = [3, "C", 100]
- },
- {
- kind = INSERT
- fields = [4, "D", 100]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, "A", 100]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, "F", 100]
- }
- {
- kind = UPDATE_BEFORE
- fields = [2, "B", 100]
- },
- {
- kind = UPDATE_AFTER
- fields = [2, "G", 100]
- },
- {
- kind = DELETE
- fields = [3, "C", 100]
- },
- {
- kind = DELETE
- fields = [4, "D", 100]
- }
- ]
- }
+ MySQL-CDC {
+ plugin_output = "cdc_source"
+ server-id = 5652
+ username = "root"
+ password = "your_password"
+ table-names = ["mydb.orders"]
+ url = "jdbc:mysql://localhost:3306/mydb"
+ }
}
transform {
RowKindExtractor {
- custom_field_name = "custom_name"
- transform_type = FULL
- plugin_output = "trans_result"
+ plugin_input = "cdc_source"
+ plugin_output = "append_only_data"
+ custom_field_name = "operation_type" # 自定义字段名
+ transform_type = FULL # 使用完整格式
+ }
+}
+
+sink {
+ Iceberg {
+ plugin_input = "append_only_data"
+ catalog_name = "iceberg_catalog"
+ database = "mydb"
+ table = "orders_history"
+ # Iceberg 表会包含 operation_type 字段,记录每条数据的变更类型
+ }
+}
+```
+
+**数据转换过程:**
+
+```
+输入数据(CDC 格式):
+ 1. RowKind=+I, order_id=1001, amount=100.00
+ 2. RowKind=-U, order_id=1001, amount=100.00
+ 3. RowKind=+U, order_id=1001, amount=150.00
+ 4. RowKind=-D, order_id=1001, amount=150.00
+
+输出数据(Append-Only 格式,FULL 格式):
+ 1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
+ 2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
+ 3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
+ 4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
+```
+
+---
+
+### 示例 3:完整的测试示例(使用 FakeSource)
+
+使用 FakeSource 生成测试数据,演示各种 RowKind 的转换效果。
+
+```yaml
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake_cdc_data"
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
}
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_updated", 95]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "B_updated", 98]
+ },
+ {
+ kind = DELETE
+ fields = [1, "A_updated", 95]
+ }
+ ]
+ }
+}
+
+transform {
+ RowKindExtractor {
+ plugin_input = "fake_cdc_data"
+ plugin_output = "transformed_data"
+ custom_field_name = "change_type"
+ transform_type = FULL
+ }
}
sink {
Console {
- plugin_input = "custom_name"
+ plugin_input = "transformed_data"
}
}
-
```
+**预期输出:**
+
+```
++I, pk_id=1, name="A", score=100, change_type="INSERT"
++I, pk_id=2, name="B", score=100, change_type="INSERT"
++I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
++I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
++I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
++I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
+```