This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b4e072486 [Doc][Improve] Support chinese for sinks (#6663)
7b4e072486 is described below

commit 7b4e07248623d4f6da33fa4c8cfd103478fc77d8
Author: corgy-w <[email protected]>
AuthorDate: Tue Apr 30 11:17:11 2024 +0800

    [Doc][Improve] Support chinese for sinks (#6663)
---
 docs/en/connector-v2/sink/Email.md                 |   2 +-
 docs/en/connector-v2/sink/Jdbc.md                  |   8 +
 docs/zh/connector-v2/formats/avro.md               | 111 +++++++
 docs/zh/connector-v2/formats/canal-json.md         | 115 +++++++
 .../formats/cdc-compatible-debezium-json.md        |  55 ++++
 docs/zh/connector-v2/formats/debezium-json.md      | 115 +++++++
 .../formats/kafka-compatible-kafkaconnect-json.md  |  47 +++
 docs/zh/connector-v2/formats/ogg-json.md           |  93 ++++++
 docs/zh/connector-v2/sink/Console.md               | 124 +++++++
 docs/zh/connector-v2/sink/Elasticsearch.md         | 218 +++++++++++++
 docs/zh/connector-v2/sink/Email.md                 |  89 +++++
 docs/zh/connector-v2/sink/Feishu.md                |  14 +-
 docs/zh/connector-v2/sink/HdfsFile.md              |   6 +-
 docs/zh/connector-v2/sink/Http.md                  |  63 ++++
 docs/zh/connector-v2/sink/Jdbc.md                  | 357 +++++++++++++++++++++
 docs/zh/connector-v2/sink/Kafka.md                 | 196 +++++++++++
 docs/zh/connector-v2/sink/common-options.md        |  58 ++++
 17 files changed, 1661 insertions(+), 10 deletions(-)

diff --git a/docs/en/connector-v2/sink/Email.md 
b/docs/en/connector-v2/sink/Email.md
index 4789884ca3..f2bca2783d 100644
--- a/docs/en/connector-v2/sink/Email.md
+++ b/docs/en/connector-v2/sink/Email.md
@@ -1,6 +1,6 @@
 # Email
 
-> Email source connector
+> Email sink connector
 
 ## Description
 
diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index c2591761ec..0153b9f92b 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -254,6 +254,8 @@ jdbc {
 
 Exactly-once
 
+Turn on exact one-time semantics by setting `is_exactly_once`
+
 ```
 jdbc {
 
@@ -273,6 +275,8 @@ jdbc {
 
 CDC(Change data capture) event
 
+jdbc receive CDC example
+
 ```
 sink {
     jdbc {
@@ -290,6 +294,8 @@ sink {
 
 Add saveMode function
 
+To facilitate the creation of tables when they do not already exist, set the 
`schema_save_mode`  to `CREATE_SCHEMA_WHEN_NOT_EXIST`.
+
 ```
 sink {
     jdbc {
@@ -309,6 +315,8 @@ sink {
 
 Postgresql 9.5 version below support CDC(Change data capture) event
 
+For PostgreSQL versions 9.5 and below, setting `compatible_mode` to 
`postgresLow` to enable support for PostgreSQL Change Data Capture (CDC) 
operations.
+
 ```
 sink {
     jdbc {
diff --git a/docs/zh/connector-v2/formats/avro.md 
b/docs/zh/connector-v2/formats/avro.md
new file mode 100644
index 0000000000..4e19ea4b98
--- /dev/null
+++ b/docs/zh/connector-v2/formats/avro.md
@@ -0,0 +1,111 @@
+# Avro 格式
+
+Avro 在流式数据处理管道中非常流行。现在seatunnel在kafka连接器中支持Avro格式
+
+# 怎样用
+
+## Kafka 使用示例
+
+- 模拟随机生成数据源,并以 Avro 的格式 写入 Kafka 的实例
+
+```bash
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 90
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_avro_topic_fake_source"
+    format = avro
+  }
+}
+```
+
+- 从 kafka 读取 avro 格式的数据并打印到控制台的示例
+
+```bash
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_avro_topic"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    format = avro
+    format_error_handle_way = skip
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "kafka_table"
+  }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/canal-json.md 
b/docs/zh/connector-v2/formats/canal-json.md
new file mode 100644
index 0000000000..92c4338eb5
--- /dev/null
+++ b/docs/zh/connector-v2/formats/canal-json.md
@@ -0,0 +1,115 @@
+# Canal 格式
+
+变更数据捕获格式:
+序列化模式、反序列化模式
+
+Canal是一款CDC(变更数据捕获)工具,能够实时捕获MySQL的数据变化并将其流式传输到其他系统中。Canal为变更日志提供了一种统一的格式,并支持使用 
JSON 和 protobuf(Canal默认使用protobuf)进行消息的序列化
+
+SeaTunnel 能够解析 Canal 的 JSON 消息,并将其转化为 INSERT/UPDATE/DELETE 消息,进而输入到 SeaTunnel 
系统中。这个特性在很多场景下都显得非常有用,例如:
+
+        将增量数据从数据库同步到其他系统
+        审计日志
+        数据库的实时物化视图
+        关联维度数据库的变更历史,等等。
+
+SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息编码为 Canal JSON 消息,并将其发送到类似 
Kafka 这样的存储中。然而,目前 SeaTunnel 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 
合并为一个单一的UPDATE消息。因此,SeaTunnel将 UPDATE_BEFORE 和 UPDATE_AFTER 编码为 Canal的 DELETE 和 
INSERT 消息来进行
+
+# 格式选项
+
+|               选项               |  默认值   | 是否需要 |                             
            描述                                         |
+|--------------------------------|--------|------|------------------------------------------------------------------------------------|
+| format                         | (none) | 是    | 指定要使用的格式,这里应该是 `canal_json` 
                                                       |
+| canal_json.ignore-parse-errors | false  | 否    | 
跳过解析错误的字段和行,而不是失败。出现错误的字段将被设置为null                                              
   |
+| canal_json.database.include    | (none) | 否    | 正则表达式,可选,通过正则匹配 Canal 
记录中的`database`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容 |
+| canal_json.table.include       | (none) | 否    | 正则表达式,可选,通过正则匹配 Canal 
记录中的`table`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容    |
+
+# 如何使用
+
+## Kafka 使用示例
+
+Canal为变更日志提供了一种统一的格式,以下是一个从MySQL products 表捕获的变更操作的简单示例
+
+```bash
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+}
+```
+
+注:请参考 [Canal 文档](https://github.com/alibaba/canal/wiki) 以了解每个字段的含义
+
+MySQL 的 products 表有 4 列(id、name、description 和 weight)
+上述 JSON 消息是产品表的一个更新变更事件,其中 id = 111 的行的 weight 值从 5.15 变为 5.18
+假设此表的 binlog 的消息已经同步到 Kafka topic,那么我们可以使用下面的 SeaTunnel 示例来消费这个主题并体现变更事件
+
+```bash
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "products_binlog"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = canal_json
+  }
+
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "localhost:9092"
+    topic = "consume-binlog"
+    format = canal_json
+  }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md 
b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
new file mode 100644
index 0000000000..e34a5b39a2
--- /dev/null
+++ b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -0,0 +1,55 @@
+# CDC 兼容 Debezium-json
+
+SeaTunnel 支持将 cdc 记录解析为 Debezium-JSON 消息,并发布到 MQ (kafka) 等消息系统中
+
+这个特性在很多场景下都非常实用,例如,它可以实现与 Debezium 生态系统的兼容性
+
+# 如何使用
+
+## MySQL-CDC 流入 Kafka
+
+```bash
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 15000
+}
+
+source {
+  MySQL-CDC {
+    result_table_name = "table1"
+
+    base-url="jdbc:mysql://localhost:3306/test"
+    "startup.mode"=INITIAL
+    table-names=[
+        "database1.t1",
+        "database1.t2",
+        "database2.t1"
+    ]
+
+    # compatible_debezium_json options
+    format = compatible_debezium_json
+    debezium = {
+        # include schema into kafka message
+        key.converter.schemas.enable = false
+        value.converter.schemas.enable = false
+        # include ddl
+        include.schema.changes = true
+        # topic prefix
+        database.server.name =  "mysql_cdc_1"
+    }
+  }
+}
+
+sink {
+  Kafka {
+    source_table_name = "table1"
+
+    bootstrap.servers = "localhost:9092"
+
+    # compatible_debezium_json options
+    format = compatible_debezium_json
+  }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/debezium-json.md 
b/docs/zh/connector-v2/formats/debezium-json.md
new file mode 100644
index 0000000000..9fef845449
--- /dev/null
+++ b/docs/zh/connector-v2/formats/debezium-json.md
@@ -0,0 +1,115 @@
+# Debezium 格式
+
+变更数据捕获格式:
+序列化模式、反序列化模式
+
+Debezium 是一套分布式服务,用于捕获数据库中的变化,以便您的应用程序可以看到这些变化并对其做出响应。Debezium 
在变更事件流中记录每个数据库表中的所有行级变化,应用程序只需读取这些流,就可以按照它们发生的顺序看到变更事件
+
+SeaTunnel 支持将 Debezium JSON 消息解析为 INSERT/UPDATE/DELETE 消息并导入到 seatunnel 
系统中。在许多情况下,利用这个特性是非常有用的,例如:
+
+        将增量数据从数据库同步到其他系统
+        审计日志
+        数据库的实时物化视图
+        关联维度数据库的变更历史,等等。
+
+SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息解析为 Debezium JSON 
消息,并将其发送到类似 Kafka 这样的存储中
+
+# 格式选项
+
+|                选项                 |  默认值   | 是否需要 |                  描述      
            |
+|-----------------------------------|--------|------|--------------------------------------|
+| format                            | (none) | 是    | 指定要使用的格式,这里应该是 
'debezium_json'.      |
+| debezium-json.ignore-parse-errors | false  | 否    | 
跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为 null |
+
+# 如何使用
+
+## Kafka 使用示例
+
+Debezium 提供了一个统一的变更日志格式,下面是一个 MySQL products 表捕获的变更操作的简单示例
+
+```bash
+{
+       "before": {
+               "id": 111,
+               "name": "scooter",
+               "description": "Big 2-wheel scooter ",
+               "weight": 5.18
+       },
+       "after": {
+               "id": 111,
+               "name": "scooter",
+               "description": "Big 2-wheel scooter ",
+               "weight": 5.17
+       },
+       "source": {
+               "version": "1.1.1.Final",
+               "connector": "mysql",
+               "name": "dbserver1",
+               "ts_ms": 1589362330000,
+               "snapshot": "false",
+               "db": "inventory",
+               "table": "products",
+               "server_id": 223344,
+               "gtid": null,
+               "file": "mysql-bin.000003",
+               "pos": 2090,
+               "row": 0,
+               "thread": 2,
+               "query": null
+       },
+       "op": "u",
+       "ts_ms": 1589362330904,
+       "transaction": null
+}
+```
+
+注:请参考 [Debezium 
文档](https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-events)
 以了解每个字段的含义
+
+MySQL 的 products 表有 4 列(id、name、description 和 weight)
+上述 JSON 消息是产品表的一个更新变更事件,其中 id = 111 的行的 weight 值从 5.18 变为 5.17
+假设消息已经同步到 Kafka 主题 products_binlog,那么我们可以使用以下的 SeaTunnel 配置来消费这个主题并通过 Debezium 
格式解释变更事件。
+
+在此配置中,您必须指定 `schema` 和 `debezium_record_include_schema` 选项:
+- `schema` 应与您的表格式相同
+- 如果您的 json 数据包含 `schema` 字段,`debezium_record_include_schema` 应为 true,如果您的 
json 数据不包含 `schema` 字段,`debezium_record_include_schema` 应为 false
+- `{"schema" : {}, "payload": { "before" : {}, "after": {} ... } }` --> `true`
+- `{"before" : {}, "after": {} ... }` --> `false`"
+
+```bash
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "products_binlog"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    }
+    debezium_record_include_schema = false
+    format = debezium_json
+  }
+
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "consume-binlog"
+    format = debezium_json
+  }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/kafka-compatible-kafkaconnect-json.md 
b/docs/zh/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
new file mode 100644
index 0000000000..d0ceb58ac6
--- /dev/null
+++ b/docs/zh/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
@@ -0,0 +1,47 @@
+# Kafka source 兼容 kafka-connect-json
+
+Seatunnel 的 Kafka 连接器支持解析通过 Kafka Connect Source 抽取的数据,特别是从 Kafka Connect JDBC 
和 Kafka Connect Debezium 抽取的数据
+
+# 如何使用
+
+## Kafka 流入 Mysql
+
+```bash
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "localhost:9092"
+    topic = "jdbc_source_record"
+    result_table_name = "kafka_table"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = COMPATIBLE_KAFKA_CONNECT_JSON
+  }
+}
+
+
+sink {
+    Jdbc {
+        driver = com.mysql.cj.jdbc.Driver
+        url = "jdbc:mysql://localhost:3306/seatunnel"
+        user = st_user
+        password = seatunnel
+        generate_sink_sql = true
+        database = seatunnel
+        table = jdbc_sink
+        primary_keys = ["id"]
+    }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/ogg-json.md 
b/docs/zh/connector-v2/formats/ogg-json.md
new file mode 100644
index 0000000000..ef2753b114
--- /dev/null
+++ b/docs/zh/connector-v2/formats/ogg-json.md
@@ -0,0 +1,93 @@
+# Ogg 格式
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a 
ogg) 
是一项托管服务,提供实时数据网格平台,该平台使用复制来保持数据高度可用,并支持实时分析。客户可以设计、执行和监控其数据复制和流数据处理解决方案,而无需分配或管理计算环境。
 Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
+
+SeaTunnel 支持将 Ogg JSON 消息解释为 Seatunnel 系统中的 INSERT/UPDATE/DELETE 
消息。在许多情况下,这个特性带来了很多便利,例如
+
+        将增量数据从数据库同步到其他系统
+        审计日志
+        数据库的实时物化视图
+        关联维度数据库的变更历史,等等。
+
+SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息转化为 Ogg JSON 消息,并将其发送到类似 
Kafka 这样的存储中。然而,目前 SeaTunnel 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 
消息。因此,Seatunnel 将 UPDATE_BEFORE 和 UPDATE_AFTER 转化为 DELETE 和 INSERT Ogg 消息来实现
+
+# 格式选项
+
+|              选项              |  默认值   | 是否需要 |                               
          描述                                         |
+|------------------------------|--------|------|------------------------------------------------------------------------------------|
+| format                       | (none) | 是    | 指定要使用的格式,这里应该是`-json`         
                                                     |
+| ogg_json.ignore-parse-errors | false  | 否    | 
跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为 null                                            
   |
+| ogg_json.database.include    | (none) | 否    | 正则表达式,可选,通过正则匹配 Canal 
记录中的`database`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容 |
+| ogg_json.table.include       | (none) | 否    | 正则表达式,可选,通过正则匹配 Canal 记录中的 
`table` 元字段来仅读取特定表的更改日志行。此字符串Pattern模式与Java的Pattern兼容   |
+
+# 如何使用 Ogg 格式
+
+## Kafka 使用示例
+
+Ogg 为变更日志提供了统一的格式,下面是从 Oracle PRODUCTS 表捕获变更操作的简单示例:
+
+```bash
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+注:各字段含义请参考 [Debezium 
文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events)注:各字段含义请参考
 [Debezium 
文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events)
+
+此 Oracle PRODUCTS 表有 4 列 (id, name, description 和 weight)
+上面的 JSON 消息是 products 表上的更新更改事件,其中 id = 111 的行的字段 `weight` 的值从 5.18 更改为 5.15。
+假设此表的 binlog 的消息已经同步到 Kafka topic,那么我们可以使用下面的 SeaTunnel 示例来消费这个 topic 并体现变更事件。
+
+```bash
+env {
+    parallelism = 1
+    job.mode = "STREAMING"
+}
+source {
+  Kafka {
+    bootstrap.servers = "127.0.0.1:9092"
+    topic = "ogg"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "double"
+      }
+    },
+    format = ogg_json
+  }
+}
+sink {
+    jdbc {
+        url = "jdbc:mysql://127.0.0.1/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "12345678"
+        table = "ogg"
+        primary_keys = ["id"]
+    }
+}
+```
+
diff --git a/docs/zh/connector-v2/sink/Console.md 
b/docs/zh/connector-v2/sink/Console.md
new file mode 100644
index 0000000000..43dff33513
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Console.md
@@ -0,0 +1,124 @@
+# Console
+
+> Console 数据接收器
+
+## 支持连接器版本
+
+- 所有版本
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 描述
+
+接收Source端传入的数据并打印到控制台。支持批同步和流同步两种模式。
+
+> 例如,来自上游的数据为 [`age: 12, name: jared`] ,则发送到控制台的内容为: 
`{"name":"jared","age":17}`
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+
+## 接收器选项
+
+|         名称         |   类型    | 是否必须 | 默认值 |                        描述        
                 |
+|--------------------|---------|------|-----|---------------------------------------------------|
+| common-options     |         | 否    | -   | Sink插件常用参数,请参考 
[Sink常用选项](common-options.md) 了解详情 |
+| log.print.data     | boolean | 否    | -   | 确定是否应在日志中打印数据的标志。默认值为`true`      
                 |
+| log.print.delay.ms | int     | 否    | -   | 将每个数据项打印到日志之间的延迟(以毫秒为单位)。默认值为`0` 
                 |
+
+## 任务示例
+
+### 简单示例:
+
+> 随机生成的数据,包含两个字段,即 `name`(字符串类型)和 `age`(整型),写入控制台,并行度为 `1`
+
+```
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake"
+  }
+}
+```
+
+### 多数据源示例:
+
+> 多数据源示例,通过配置可以指定数据源写入指定接收器
+
+```
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake1"
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+        sex = "string"
+      }
+    }
+  }
+   FakeSource {
+    result_table_name = "fake2"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Console {
+    source_table_name = "fake2"
+  }
+}
+```
+
+## 控制台示例数据
+
+控制台打印的输出:
+
+```
+2022-12-19 11:01:45,417 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
output rowType: name<STRING>, age<INT>
+2022-12-19 11:01:46,489 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
CpiOd, 8520946
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
eQqTs, 1256802974
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
UsRgO, 2053193072
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
jDQJj, 1993016602
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
rqdKp, 1392682764
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
wCoWN, 986999925
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
qomTU, 72775247
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
jcqXR, 1074529204
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
AkWIO, 1961723427
+2022-12-19 11:01:46,490 INFO  
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - 
subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: 
hBoib, 929089763
+```
+
diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md 
b/docs/zh/connector-v2/sink/Elasticsearch.md
new file mode 100644
index 0000000000..edf974d8fb
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Elasticsearch.md
@@ -0,0 +1,218 @@
+# Elasticsearch
+
+## 描述
+
+输出数据到 `Elasticsearch`
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+:::tip
+
+引擎支持
+
+* 支持  `ElasticSearch 版本 >= 2.x 并且 <= 8.x`
+
+:::
+
+## 选项
+
+|           名称            |   类型    | 是否必须 |             默认值              |
+|-------------------------|---------|------|------------------------------|
+| hosts                   | array   | 是    | -                            |
+| index                   | string  | 是    | -                            |
+| schema_save_mode        | string  | 是    | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode          | string  | 是    | APPEND_DATA                  |
+| index_type              | string  | 否    |                              |
+| primary_keys            | list    | 否    |                              |
+| key_delimiter           | string  | 否    | `_`                          |
+| username                | string  | 否    |                              |
+| password                | string  | 否    |                              |
+| max_retry_count         | int     | 否    | 3                            |
+| max_batch_size          | int     | 否    | 10                           |
+| tls_verify_certificate  | boolean | 否    | true                         |
+| tls_verify_hostnames    | boolean | 否    | true                         |
+| tls_keystore_path       | string  | 否    | -                            |
+| tls_keystore_password   | string  | 否    | -                            |
+| tls_truststore_path     | string  | 否    | -                            |
+| tls_truststore_password | string  | 否    | -                            |
+| common-options          |         | 否    | -                            |
+
+### hosts [array]
+
+`Elasticsearch` 集群http地址,格式为 `host:port` ,允许指定多个主机。例如 `["host1:9200", 
"host2:9200"]`
+
+### index [string]
+
+`Elasticsearch` 的 `index` 名称。索引支持包含字段名变量,例如 `seatunnel_${age}`,并且该字段必须出现在 
seatunnel Row 中。如果没有,我们将把它视为普通索引
+
+### index_type [string]
+
+`Elasticsearch` 索引类型,elasticsearch 6及以上版本建议不要指定
+
+### primary_keys [list]
+
+主键字段用于生成文档 `_id` ,这是 CDC 必需的选项。
+
+### key_delimiter [string]
+
+设定复合键的分隔符(默认为 `_`),例如,如果使用 `$` 作为分隔符,那么文档的 `_id` 将呈现为 `KEY1$KEY2$KEY3` 的格式
+
+### username [string]
+
+x-pack 用户名
+
+### password [string]
+
+x-pack 密码
+
+### max_retry_count [int]
+
+批次批量请求最大尝试大小
+
+### max_batch_size [int]
+
+批次批量文档最大大小
+
+### tls_verify_certificate [boolean]
+
+为 HTTPS 端点启用证书验证
+
+### tls_verify_hostname [boolean]
+
+为 HTTPS 端点启用主机名验证
+
+### tls_keystore_path [string]
+
+指向 PEM 或 JKS 密钥存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件
+
+### tls_keystore_password [string]
+
+指定的密钥存储的密钥密码
+
+### tls_truststore_path [string]
+
+指向 PEM 或 JKS 信任存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件
+
+### tls_truststore_password [string]
+
+指定的信任存储的密钥密码
+
+### common options
+
+Sink插件常用参数,请参考 [Sink常用选项](common-options.md) 了解详情
+
+### schema_save_mode
+
+在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案<br/>
+选项介绍:<br/>
+`RECREATE_SCHEMA` :当表不存在时会创建,当表已存在时会删除并重建<br/>
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :当表不存在时会创建,当表已存在时则跳过创建<br/>
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :当表不存在时将抛出错误<br/>
+
+### data_save_mode
+
+在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案<br/>
+选项介绍:<br/>
+`DROP_DATA`: 保留数据库结构,删除数据<br/>
+`APPEND_DATA`:保留数据库结构,保留数据<br/>
+`ERROR_WHEN_DATA_EXISTS`:当有数据时抛出错误<br/>
+
+## 示例
+
+简单示例
+
+```bash
+sink {
+    Elasticsearch {
+        hosts = ["localhost:9200"]
+        index = "seatunnel-${age}"
+    }
+}
+```
+
+变更数据捕获 (Change data capture) 事件
+
+```bash
+sink {
+    Elasticsearch {
+        hosts = ["localhost:9200"]
+        index = "seatunnel-${age}"
+        
+        # CDC required options
+        primary_keys = ["key1", "key2", ...]
+    }
+}
+```
+
+SSL 禁用证书验证
+
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        username = "elastic"
+        password = "elasticsearch"
+        
+        tls_verify_certificate = false
+    }
+}
+```
+
+SSL 禁用主机名验证
+
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        username = "elastic"
+        password = "elasticsearch"
+        
+        tls_verify_hostname = false
+    }
+}
+```
+
+SSL 启用证书验证
+
+通过设置 `tls_keystore_path` 与 `tls_keystore_password` 指定证书路径及密码
+
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        username = "elastic"
+        password = "elasticsearch"
+        
+        tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
+        tls_keystore_password = "${your password}"
+    }
+}
+```
+
+配置表生成策略 (schema_save_mode)
+
+通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
+
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        username = "elastic"
+        password = "elasticsearch"
+        
+        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+        data_save_mode = "APPEND_DATA"
+    }
+}
+```
+
+## 变更日志
+
+### 下一版本
+
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3673](https://github.com/apache/seatunnel/pull/3673))
+- [Feature] Support https protocol & compatible with opensearch 
([3997](https://github.com/apache/seatunnel/pull/3997))
+
diff --git a/docs/zh/connector-v2/sink/Email.md 
b/docs/zh/connector-v2/sink/Email.md
new file mode 100644
index 0000000000..cc3999c580
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Email.md
@@ -0,0 +1,89 @@
+# Email
+
+> Email 数据接收器
+
+## 描述
+
+将接收的数据作为文件发送到电子邮件
+
+## 支持版本
+
+测试版本:1.5.6(供参考)
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+
+## 选项
+
+|            名称            |   类型   | 是否必须 | 默认值 |
+|--------------------------|--------|------|-----|
+| email_from_address       | string | 是    | -   |
+| email_to_address         | string | 是    | -   |
+| email_host               | string | 是    | -   |
+| email_transport_protocol | string | 是    | -   |
+| email_smtp_auth          | string | 是    | -   |
+| email_authorization_code | string | 是    | -   |
+| email_message_headline   | string | 是    | -   |
+| email_message_content    | string | 是    | -   |
+| common-options           |        | 否    | -   |
+
+### email_from_address [string]
+
+发件人邮箱地址
+
+### email_to_address [string]
+
+接收邮件的地址
+
+### email_host [string]
+
+连接的SMTP服务器地址
+
+### email_transport_protocol [string]
+
+加载会话的协议
+
+### email_smtp_auth [string]
+
+是否对客户进行认证
+
+### email_authorization_code [string]
+
+授权码,您可以从邮箱设置中获取授权码
+
+### email_message_headline [string]
+
+邮件的标题
+
+### email_message_content [string]
+
+邮件消息的正文
+
+### common options
+
+Sink插件常用参数,请参考 [Sink常用选项](common-options.md) 了解详情.
+
+## 示例
+
+```bash
+
+ EmailSink {
+      email_from_address = "[email protected]"
+      email_to_address = "[email protected]"
+      email_host="smtp.qq.com"
+      email_transport_protocol="smtp"
+      email_smtp_auth="true"
+      email_authorization_code=""
+      email_message_headline=""
+      email_message_content=""
+   }
+
+```
+
+## 变更日志
+
+### 2.2.0-beta 2022-09-26
+
+- 添加 Email 接收器连接器
+
diff --git a/docs/zh/connector-v2/sink/Feishu.md 
b/docs/zh/connector-v2/sink/Feishu.md
index 01cdc7a482..c561e50a97 100644
--- a/docs/zh/connector-v2/sink/Feishu.md
+++ b/docs/zh/connector-v2/sink/Feishu.md
@@ -10,8 +10,8 @@
 
 ## 主要特性
 
-- [ ] [精确一次](../../../en/concept/connector-v2-features.md)
-- [ ] [变更数据捕获](../../../en/concept/connector-v2-features.md)
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [变更数据捕获](../../concept/connector-v2-features.md)
 
 ## 描述
 
@@ -42,11 +42,11 @@
 
 ## 接收器选项
 
-|       名称       |   类型   | 是否必需 | 默认值 |                                       
  描述                                         |
-|----------------|--------|------|-----|------------------------------------------------------------------------------------|
-| url            | String | 是    | -   | 飞书web hook URL                        
                                             |
-| headers        | Map    | 否    | -   | HTTP 请求头                              
                                             |
-| common-options |        | 否    | -   | 接收器插件常见参数,请参阅 
[接收器通用选项](../../../en/connector-v2/source/common-options.md) 以获取详细信息 |
+|       名称       |   类型   | 是否必需 | 默认值 |                         描述            
             |
+|----------------|--------|------|-----|----------------------------------------------------|
+| url            | String | 是    | -   | 飞书web hook URL                        
             |
+| headers        | Map    | 否    | -   | HTTP 请求头                              
             |
+| common-options |        | 否    | -   | 接收器插件常见参数,请参阅 
[接收器通用选项](common-options.md) 以获取详细信息 |
 
 ## 任务示例
 
diff --git a/docs/zh/connector-v2/sink/HdfsFile.md 
b/docs/zh/connector-v2/sink/HdfsFile.md
index 1ed6ea2d56..dee466770e 100644
--- a/docs/zh/connector-v2/sink/HdfsFile.md
+++ b/docs/zh/connector-v2/sink/HdfsFile.md
@@ -60,13 +60,15 @@
 | kerberos_principal               | string  | 否    | -                        
                  | kerberos 的主体                                                
                                                                                
                                                                                
                                                                     |
 | kerberos_keytab_path             | string  | 否    | -                        
                  | kerberos 的 keytab 路径                                        
                                                                                
                                                                                
                                                                     |
 | compress_codec                   | string  | 否    | none                     
                  | 压缩编解码器                                                      
                                                                                
                                                                                
                                                                     |
-| common-options                   | object  | 否    | -                        
                  | 接收器插件通用参数,请参阅 
[接收器通用选项](../../../en/connector-v2/source/common-options.md) 了解详情               
                                                                                
                                                                                
                                   |
+| common-options                   | object  | 否    | -                        
                  | 接收器插件通用参数,请参阅 [接收器通用选项](common-options.md) 了解详情             
                                                                                
                                                                                
                                                                     |
 | max_rows_in_memory               | int     | 否    | -                        
                  | 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。  
                                                                                
                                                                                
                                                                     |
 | sheet_name                       | string  | 否    | Sheet${Random number}    
                  | 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名                    
                                                                                
                                                                                
                                                                     |
 
 ### 提示
 
-> 如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 
版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 
`${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。
+> 如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是
+> 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop
+> jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。
 
 ## 任务示例
 
diff --git a/docs/zh/connector-v2/sink/Http.md 
b/docs/zh/connector-v2/sink/Http.md
new file mode 100644
index 0000000000..f837380efd
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Http.md
@@ -0,0 +1,63 @@
+# Http
+
+> Http 数据接收器
+
+## 支持引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## 描述
+
+接收Source端传入的数据,利用数据触发 web hooks。
+
+> 例如,来自上游的数据为[`age: 12, name: tyrantlucifer`],则body内容如下:`{"age": 12, "name": 
"tyrantlucifer"}`
+
+**Tips: Http 接收器仅支持 `post json` 类型的 web hook,source 数据将被视为 webhook 中的 body 
内容。**
+
+## 支持的数据源信息
+
+想使用 Http 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖
+
+| 数据源  | 支持版本 |                                                  依赖            
                                      |
+|------|------|------------------------------------------------------------------------------------------------------|
+| Http | 通用   | 
[下载](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http)
 |
+
+## 接收器选项
+
+|             名称              |   类型   | 是否必须 |  默认值  |                        
 描述                         |
+|-----------------------------|--------|------|-------|----------------------------------------------------|
+| url                         | String | 是    | -     | Http 请求链接              
                            |
+| headers                     | Map    | 否    | -     | Http 标头                
                            |
+| retry                       | Int    | 否    | -     | 
如果请求http返回`IOException`的最大重试次数                     |
+| retry_backoff_multiplier_ms | Int    | 否    | 100   | http请求失败,重试回退次数(毫秒)乘数  
                            |
+| retry_backoff_max_ms        | Int    | 否    | 10000 | http请求失败,最大重试回退时间(毫秒)  
                            |
+| connect_timeout_ms          | Int    | 否    | 12000 | 连接超时设置,默认12s           
                            |
+| socket_timeout_ms           | Int    | 否    | 60000 | 套接字超时设置,默认为60s         
                            |
+| common-options              |        | 否    | -     | Sink插件常用参数,请参考 
[Sink常用选项 ](common-options.md) 了解详情 |
+
+## 示例
+
+简单示例:
+
+```hocon
+Http {
+    url = "http://localhost/test/webhook";
+    headers {
+        token = "9e32e859ef044462a257e1fc76730066"
+    }
+}
+```
+
+## 变更日志
+
+### 2.2.0-beta 2022-09-26
+
+- 添加Http接收连接器
+
diff --git a/docs/zh/connector-v2/sink/Jdbc.md 
b/docs/zh/connector-v2/sink/Jdbc.md
new file mode 100644
index 0000000000..e9c2665b75
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -0,0 +1,357 @@
+# JDBC
+
+> JDBC 数据接收器
+
+## 描述
+
+通过jdbc写入数据。支持批处理模式和流处理模式,支持并发写入,支持精确一次语义(使用XA事务保证)
+
+## 使用依赖
+
+### 用于Spark/Flink引擎
+
+> 1. 需要确保jdbc驱动jar包已经放在目录`${SEATUNNEL_HOME}/plugins/`下。
+
+### 适用于 SeaTunnel Zeta 引擎
+
+> 1. 需要确保jdbc驱动jar包已经放到`${SEATUNNEL_HOME}/lib/`目录下。
+
+## 主要特性
+
+- [x] [精确一次](../../concept/connector-v2-features.md)
+
+使用 `Xa transactions` 来确保 `exactly-once`。所以仅对于支持 `Xa transactions` 的数据库支持 
`exactly-once`
+。你可以设置 `is_exactly_once=true` 来启用它。
+
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## Options
+
+|                    名称                     |   类型    | 是否必须 |             默认值 
             |
+|-------------------------------------------|---------|------|------------------------------|
+| url                                       | String  | 是    | -               
             |
+| driver                                    | String  | 是    | -               
             |
+| user                                      | String  | 否    | -               
             |
+| password                                  | String  | 否    | -               
             |
+| query                                     | String  | 否    | -               
             |
+| compatible_mode                           | String  | 否    | -               
             |
+| database                                  | String  | 否    | -               
             |
+| table                                     | String  | 否    | -               
             |
+| primary_keys                              | Array   | 否    | -               
             |
+| support_upsert_by_query_primary_key_exist | Boolean | 否    | false           
             |
+| connection_check_timeout_sec              | Int     | 否    | 30              
             |
+| max_retries                               | Int     | 否    | 0               
             |
+| batch_size                                | Int     | 否    | 1000            
             |
+| is_exactly_once                           | Boolean | 否    | false           
             |
+| generate_sink_sql                         | Boolean | 否    | false           
             |
+| xa_data_source_class_name                 | String  | 否    | -               
             |
+| max_commit_attempts                       | Int     | 否    | 3               
             |
+| transaction_timeout_sec                   | Int     | 否    | -1              
             |
+| auto_commit                               | Boolean | 否    | true            
             |
+| field_ide                                 | String  | 否    | -               
             |
+| properties                                | Map     | 否    | -               
             |
+| common-options                            |         | 否    | -               
             |
+| schema_save_mode                          | Enum    | 否    | 
CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode                            | Enum    | 否    | APPEND_DATA     
             |
+| custom_sql                                | String  | 否    | -               
             |
+| enable_upsert                             | Boolean | 否    | true            
             |
+| use_copy_statement                        | Boolean | 否    | false           
             |
+
+### driver [string]
+
+用于连接远程数据源的 jdbc 类名,如果使用MySQL,则值为`com.mysql.cj.jdbc.Driver`
+
+### user [string]
+
+用户名
+
+### password [string]
+
+密码
+
+### url [string]
+
+JDBC 连接的 URL。参考案例:`jdbc:postgresql://localhost/test`
+
+### query [string]
+
+使用 sql 语句将上游输入数据写入到数据库。如 `INSERT ...`
+
+### compatible_mode [string]
+
+数据库的兼容模式,当数据库支持多种兼容模式时需要。例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 。
+
+Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC
+
+### database [string]
+
+使用此 `database` 和 `table-name` 自动生成 SQL,并接收上游输入的数据写入数据库。
+
+此选项与 `query` 选项是互斥的,此选项具有更高的优先级。
+
+### table [string]
+
+使用 `database` 和此 `table-name` 自动生成 SQL,并接收上游输入的数据写入数据库。
+
+此选项与 `query` 选项是互斥的,此选项具有更高的优先级。
+
+table参数可以填入一个任意的表名,这个名字最终会被用作创建表的表名,并且支持变量(`${table_name}`,`${schema_name}`)。
+替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${table_name}` 将替换传递给目标端的表名。
+
+mysql 接收器示例:
+
+1. test_${schema_name}_${table_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+pgsql (Oracle Sqlserver ...) 接收器示例:
+
+1. ${schema_name}.${table_name}_test
+2. dbo.tt_${table_name}_sink
+3. public.sink_table
+
+Tip: 如果目标数据库有 SCHEMA 的概念,则表参数必须写成 `xxx.xxx`
+
+### primary_keys [array]
+
+该选项用于辅助生成 insert、delete、update 等 sql 语句。设置了该选项,将会根据该选项生成对应的 sql 语句
+
+### support_upsert_by_query_primary_key_exist [boolean]
+
+根据查询主键是否存在来选择使用 INSERT sql、UPDATE sql 来处理变更事件(INSERT、UPDATE_AFTER)。仅当数据库不支持 
upsert 语法时才使用此配置
+**注意**:该方法性能较低
+
+### connection_check_timeout_sec [int]
+
+用于验证数据库连接的有效性时等待数据库操作完成所需的时间,单位是秒
+
+### max_retries[int]
+
+重试提交失败的最大次数(executeBatch)
+
+### batch_size[int]
+
+对于批量写入,当缓冲的记录数达到 `batch_size` 数量或者时间达到 `checkpoint.interval` 时,数据将被刷新到数据库中
+
+### is_exactly_once[boolean]
+
+是否启用通过XA事务实现的精确一次语义。开启,你还需要设置 `xa_data_source_class_name`
+
+### generate_sink_sql[boolean]
+
+根据要写入的数据库表结构生成 sql 语句
+
+### xa_data_source_class_name[string]
+
+指数据库驱动的 XA 数据源的类名。以 MySQL 为例,其类名为 
com.mysql.cj.jdbc.MysqlXADataSource。了解其他数据库的数据源类名,可以参考文档的附录部分
+
+### max_commit_attempts[int]
+
+事务提交失败的最大重试次数
+
+### transaction_timeout_sec[int]
+
+在事务开启后的超时时间,默认值为-1(即永不超时)。请注意,设置超时时间可能会影响到精确一次(exactly-once)的语义
+
+### auto_commit [boolean]
+
+默认启用自动事务提交
+
+### field_ide [String]
+
+字段 `field_ide` 用于在从 source 同步到 sink 时,确定字段是否需要转换为大写或小写。'ORIGINAL' 
表示不需要转换,'UPPERCASE' 表示转换为大写,'LOWERCASE' 表示转换为小写
+
+### properties
+
+附加连接配置参数,当属性和URL具有相同参数时,优先级由驱动程序的具体实现确定。例如,在 MySQL 中,属性配置优先于 URL。
+
+### common options
+
+Sink插件常用参数,请参考 [Sink常用选项](common-options.md) 了解详情
+
+### schema_save_mode [Enum]
+
+在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案<br/>
+选项介绍:<br/>
+`RECREATE_SCHEMA`:当表不存在时会创建,当表已存在时会删除并重建<br/>
+`CREATE_SCHEMA_WHEN_NOT_EXIST`:当表不存在时会创建,当表已存在时则跳过创建<br/>
+`ERROR_WHEN_SCHEMA_NOT_EXIST`:当表不存在时将抛出错误<br/>
+
+### data_save_mode [Enum]
+
+在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案<br/>
+选项介绍:<br/>
+`DROP_DATA`:保留数据库结构,删除数据<br/>
+`APPEND_DATA`:保留数据库结构,保留数据<br/>
+`CUSTOM_PROCESSING`:允许用户自定义数据处理方式<br/>
+`ERROR_WHEN_DATA_EXISTS`:当有数据时抛出错误<br/>
+
+### custom_sql [String]
+
+当`data_save_mode`选择`CUSTOM_PROCESSING`时,需要填写`CUSTOM_SQL`参数。该参数通常填写一条可以执行的SQL。SQL将在同步任务之前执行
+
+### enable_upsert [boolean]
+
+启用通过主键更新插入,如果任务没有key重复数据,设置该参数为 false 可以加快数据导入速度
+
+### use_copy_statement [boolean]
+
+使用 `COPY ${table} FROM STDIN` 语句导入数据。仅支持具有 `getCopyAPI()` 
方法连接的驱动程序。例如:Postgresql
+驱动程序 `org.postgresql.Driver`
+
+注意:不支持 `MAP`、`ARRAY`、`ROW`类型
+
+## tips
+
+在 is_exactly_once = "true" 的情况下,使用 XA 事务。这需要数据库支持,有些数据库需要一些设置:<br/>
+1 postgres 需要设置 `max_prepared_transactions > 1` 例如 `ALTER SYSTEM set 
max_prepared_transactions to 10` <br/>
+2 mysql 版本需要 >= `8.0.29` 并且非 root 用户需要授予 `XA_RECOVER_ADMIN` 权限。例如:将 test_db.* 
上的 XA_RECOVER_ADMIN
+授予 `'user1'@'%'`<br/>
+3 mysql可以尝试在url中添加 `rewriteBatchedStatements=true` 参数以获得更好的性能<br/>
+
+## 附录
+
+附录参数仅提供参考
+
+|    数据源     |                    driver                    |                  
              url                                 |             
xa_data_source_class_name              |                                        
            maven                                                    |
+|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| MySQL      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | 
com.mysql.cj.jdbc.MysqlXADataSource                | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
+| PostgreSQL | org.postgresql.Driver                        | 
jdbc:postgresql://localhost:5432/postgres                          | 
org.postgresql.xa.PGXADataSource                   | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                            |
+| DM         | dm.jdbc.driver.DmDriver                      | 
jdbc:dm://localhost:5236                                           | 
dm.jdbc.driver.DmdbXADataSource                    | 
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                    
                            |
+| Phoenix    | org.apache.phoenix.queryserver.client.Driver | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /          
                                        | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                        |
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
jdbc:sqlserver://localhost:1433                                    | 
com.microsoft.sqlserver.jdbc.SQLServerXADataSource | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                            |
+| Oracle     | oracle.jdbc.OracleDriver                     | 
jdbc:oracle:thin:@localhost:1521/xepdb1                            | 
oracle.jdbc.xa.OracleXADataSource                  | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                            |
+| sqlite     | org.sqlite.JDBC                              | 
jdbc:sqlite:test.db                                                | /          
                                        | 
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc                       
                            |
+| GBase8a    | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                               | /          
                                        | 
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
 |
+| StarRocks  | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
+| db2        | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                  | 
com.ibm.db2.jcc.DB2XADataSource                    | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                            |
+| saphana    | com.sap.db.jdbc.Driver                       | 
jdbc:sap://localhost:39015                                         | /          
                                        | 
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc                  
                            |
+| Doris      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
+| teradata   | com.teradata.jdbc.TeraDriver                 | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test              | /          
                                        | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                            |
+| Redshift   | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                              | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                            |
+| Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc:snowflake://<account_name>.snowflakecomputing.com             | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                            |
+| Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
+| Kingbase   | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                          |
+| OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
              |
+
+## 示例
+
+简单示例
+
+```
+jdbc {
+    url = "jdbc:mysql://localhost:3306/test"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "root"
+    password = "123456"
+    query = "insert into test_table(name,age) values(?,?)"
+}
+
+```
+
+精确一次 (Exactly-once)
+
+通过设置 `is_exactly_once` 开启精确一次语义
+
+```
+jdbc {
+
+    url = "jdbc:mysql://localhost:3306/test"
+    driver = "com.mysql.cj.jdbc.Driver"
+
+    max_retries = 0
+    user = "root"
+    password = "123456"
+    query = "insert into test_table(name,age) values(?,?)"
+
+    is_exactly_once = "true"
+
+    xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+}
+```
+
+变更数据捕获 (Change data capture) 事件
+
+jdbc 接收 CDC 示例
+
+```
+sink {
+    jdbc {
+        url = "jdbc:mysql://localhost:3306"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "123456"
+        
+        database = "sink_database"
+        table = "sink_table"
+        primary_keys = ["key1", "key2", ...]
+    }
+}
+```
+
+配置表生成策略 (schema_save_mode)
+
+通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
+
+```
+sink {
+    jdbc {
+        url = "jdbc:mysql://localhost:3306"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "123456"
+        
+        database = "sink_database"
+        table = "sink_table"
+        primary_keys = ["key1", "key2", ...]
+        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+        data_save_mode="APPEND_DATA"
+    }
+}
+```
+
+支持Postgres 9.5及以下版本的 CDC 示例
+
+Postgres 9.5及以下版本,通过设置 `compatible_mode` 配置为 `postgresLow` 来支持 Postgres CDC 操作
+
+```
+sink {
+    jdbc {
+        url = "jdbc:postgresql://localhost:5432"
+        driver = "org.postgresql.Driver"
+        user = "root"
+        password = "123456"
+        compatible_mode="postgresLow"
+        database = "sink_database"
+        table = "sink_table"
+        support_upsert_by_query_primary_key_exist = true
+        generate_sink_sql = true
+        primary_keys = ["key1", "key2", ...]
+    }
+}
+
+```
+
+## 变更日志
+
+### 2.3.0-beta 2022-10-20
+
+- [BugFix] Fix JDBC split exception 
([2904](https://github.com/apache/seatunnel/pull/2904))
+- [Feature] Support Phoenix JDBC Sink 
([2499](https://github.com/apache/seatunnel/pull/2499))
+- [Feature] Support SQL Server JDBC Sink 
([2646](https://github.com/apache/seatunnel/pull/2646))
+- [Feature] Support Oracle JDBC Sink 
([2550](https://github.com/apache/seatunnel/pull/2550))
+- [Feature] Support StarRocks JDBC Sink 
([3060](https://github.com/apache/seatunnel/pull/3060))
+- [Feature] Support DB2 JDBC Sink 
([2410](https://github.com/apache/seatunnel/pull/2410))
+
+### next version
+
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3378](https://github.com/apache/seatunnel/issues/3378))
+- [Feature] Support Teradata JDBC Sink 
([3362](https://github.com/apache/seatunnel/pull/3362))
+- [Feature] Support Sqlite JDBC Sink 
([3089](https://github.com/apache/seatunnel/pull/3089))
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3378](https://github.com/apache/seatunnel/issues/3378))
+- [Feature] Support Doris JDBC Sink
+- [Feature] Support Redshift JDBC 
Sink([#3615](https://github.com/apache/seatunnel/pull/3615))
+- [Improve] Add config item enable upsert by 
query([#3708](https://github.com/apache/seatunnel/pull/3708))
+- [Improve] Add database field to sink 
config([#4199](https://github.com/apache/seatunnel/pull/4199))
+- [Improve] Add Vertica 
connector([#4303](https://github.com/apache/seatunnel/pull/4303))
+
diff --git a/docs/zh/connector-v2/sink/Kafka.md 
b/docs/zh/connector-v2/sink/Kafka.md
new file mode 100644
index 0000000000..c0ce933870
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Kafka.md
@@ -0,0 +1,196 @@
+# Kafka
+
+> Kafka 数据接收器
+
+## 支持引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 主要特性
+
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+> 默认情况下,我们将使用 2pc 来保证消息只发送一次到kafka
+
+## 描述
+
+将 Rows 内容发送到 Kafka topic
+
+## 支持的数据源信息
+
+为了使用 Kafka 连接器,需要以下依赖项
+可以通过 install-plugin.sh 或从 Maven 中央存储库下载
+
+|  数据源  | 支持版本 |                                                 Maven         
                                        |
+|-------|------|-------------------------------------------------------------------------------------------------------|
+| Kafka | 通用   | 
[下载](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka)
 |
+
+## 接收器选项
+
+|          名称          |   类型   | 是否需要 | 默认值  |                                
                                                                                
       描述                                                                       
                                                |
+|----------------------|--------|------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic                | String | 是    | -    | 当表用作接收器时,topic 名称是要写入数据的 topic 
                                                                                
                                                                                
                                                |
+| bootstrap.servers    | String | 是    | -    | Kafka brokers 使用逗号分隔           
                                                                                
                                                                                
                                                |
+| kafka.config         | Map    | 否    | -    | 除了上述 Kafka Producer 
客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 
[Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs)
                                                                                
            |
+| semantics            | String | 否    | NON  | 可以选择的语义是 
EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。                                          
                                                                                
                                                                      |
+| partition_key_fields | Array  | 否    | -    | 配置字段用作 kafka 消息的key            
                                                                                
                                                                                
                                                |
+| partition            | Int    | 否    | -    | 可以指定分区,所有消息都会发送到此分区            
                                                                                
                                                                                
                                                |
+| assign_partitions    | Array  | 否    | -    | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息  
                                                                                
                                                                                
                                                |
+| transaction_prefix   | String | 否    | -    | 
如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 
来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀                              
                                                                                
         |
+| format               | String | 否    | json | 
数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 和 avro。如果使用 json 
或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅
 [debezium-json](../formats/debezium-json.md) 了解详细信息 |
+| field_delimiter      | String | 否    | ,    | 自定义数据格式的字段分隔符                  
                                                                                
                                                                                
                                                |
+| common-options       |        | 否    | -    | Sink插件常用参数,请参考 [Sink常用选项 
](common-options.md) 了解详情                                                       
                                                                                
                                                      |
+
+## 参数解释
+
+### Topic 格式
+
+目前支持两种格式:
+
+1. 填写topic名称
+
+2. 使用上游数据中的字段值作为 topic ,格式是 `${your field name}`, 其中 topic 是上游数据的其中一列的值
+
+   例如,上游数据如下:
+
+| name | age |     data      |
+|------|-----|---------------|
+| Jack | 16  | data-example1 |
+| Mary | 23  | data-example2 |
+
+如果 `${name}` 设置为 topic。因此,第一行发送到 Jack topic,第二行发送到 Mary topic。
+
+### 语义
+
+在 EXACTLY_ONCE 中,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给 
Kafka,该模式下能保证数据精确写入kafka一次,即使任务失败重试也不会出现数据重复和丢失
+在 AT_LEAST_ONCE 中,生产者将等待 Kafka 缓冲区中所有未完成的消息在检查点上被 Kafka 
生产者确认,该模式下能保证数据至少写入kafka一次,即使任务失败
+NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢失,并且消息可能会重复,该模式下,任务失败重试可能会产生数据丢失或重复。
+
+### 分区关键字段
+
+例如,如果你想使用上游数据中的字段值作为键,可以将这些字段名指定给此属性
+
+上游数据如下所示:
+
+| name | age |     data      |
+|------|-----|---------------|
+| Jack | 16  | data-example1 |
+| Mary | 23  | data-example2 |
+
+如果将 name 设置为 key,那么 name 列的哈希值将决定消息发送到哪个分区。
+如果没有设置分区键字段,则将发送空消息键。
+消息 key 的格式为 json,如果设置 name 为 key,例如 `{"name":"Jack"}`。
+所选的字段必须是上游数据中已存在的字段。
+
+### 分区分配
+
+假设总有五个分区,配置中的 assign_partitions 字段设置为:
+assign_partitions = ["shoe", "clothing"]
+在这种情况下,包含 "shoe" 的消息将被发送到第零个分区,因为 "shoe" 在 assign_partitions 中被标记为零, 而包含 
"clothing" 的消息将被发送到第一个分区。
+对于其他的消息,我们将使用哈希算法将它们均匀地分配到剩余的分区中。
+这个功能是通过 MessageContentPartitioner 类实现的,该类实现了 
org.apache.kafka.clients.producer.Partitioner 接口。如果我们需要自定义分区,我们需要实现这个接口。
+
+## 任务示例
+
+### 简单:
+
+> 此示例展示了如何定义一个 SeaTunnel 同步任务,该任务能够通过 FakeSource 自动产生数据并将其发送到 Kafka 
Sink。在这个例子中,FakeSource 会生成总共 16 行数据(`row.num=16`),每一行都包含两个字段,即 `name`(字符串类型)和 
`age`(整型)。最终,这些数据将被发送到名为 test_topic 的 topic 中,因此该 topic 也将包含 16 行数据。
+> 如果你还未安装和部署 SeaTunnel,你需要参照 
[安装SeaTunnel](../../start-v2/locally/deployment.md) 的指南来进行安装和部署。完成安装和部署后,你可以按照 
[快速开始使用 SeaTunnel 引擎](../../start-v2/locally/quick-start-seatunnel-engine.md) 
的指南来运行任务。
+
+```hocon
+# Defining the runtime environment
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    parallelism = 1
+    result_table_name = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  kafka {
+      topic = "test_topic"
+      bootstrap.servers = "localhost:9092"
+      format = json
+      kafka.request.timeout.ms = 60000
+      semantics = EXACTLY_ONCE
+      kafka.config = {
+        acks = "all"
+        request.timeout.ms = 60000
+        buffer.memory = 33554432
+      }
+  }
+}
+```
+
+### AWS MSK SASL/SCRAM
+
+将以下 `${username}` 和 `${password}` 替换为 AWS MSK 中的配置值。
+
+```hocon
+sink {
+  kafka {
+      topic = "seatunnel"
+      bootstrap.servers = "localhost:9092"
+      format = json
+      kafka.request.timeout.ms = 60000
+      semantics = EXACTLY_ONCE
+      kafka.config = {
+         security.protocol=SASL_SSL
+         sasl.mechanism=SCRAM-SHA-512
+         
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule 
required \nusername=${username}\npassword=${password};"
+      }
+  }
+}
+```
+
+### AWS MSK IAM
+
+从 https://github.com/aws/aws-msk-iam-auth/releases 下载 
`aws-msk-iam-auth-1.1.5.jar`
+并将其放入 `$SEATUNNEL_HOME/plugin/kafka/lib` 中目录。
+请确保 IAM 策略具有 `kafka-cluster:Connect`
+如下配置:
+
+```hocon
+"Effect": "Allow",
+"Action": [
+    "kafka-cluster:Connect",
+    "kafka-cluster:AlterCluster",
+    "kafka-cluster:DescribeCluster"
+],
+```
+
+接收器配置
+
+```hocon
+sink {
+  kafka {
+      topic = "seatunnel"
+      bootstrap.servers = "localhost:9092"
+      format = json
+      kafka.request.timeout.ms = 60000
+      semantics = EXACTLY_ONCE
+      kafka.config = {
+         security.protocol=SASL_SSL
+         sasl.mechanism=AWS_MSK_IAM
+         sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule 
required;"
+         
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
+      }
+  }
+}
+```
+
diff --git a/docs/zh/connector-v2/sink/common-options.md 
b/docs/zh/connector-v2/sink/common-options.md
new file mode 100644
index 0000000000..8569b46da0
--- /dev/null
+++ b/docs/zh/connector-v2/sink/common-options.md
@@ -0,0 +1,58 @@
+# Sink 常用选项
+
+> Sink 连接器常用参数
+
+|        名称         |   类型   | 是否需要 | 默认值 |
+|-------------------|--------|------|-----|
+| source_table_name | string | 否    | -   |
+| parallelism       | int    | 否    | -   |
+
+### source_table_name [string]
+
+当不指定 `source_table_name` 时,当前插件处理配置文件中上一个插件输出的数据集 `dataset`
+
+当指定了 `source_table_name` 时,当前插件正在处理该参数对应的数据集
+
+### parallelism [int]
+
+当没有指定`parallelism`时,默认使用 env 中的 `parallelism`。
+
+当指定 `parallelism` 时,它将覆盖 env 中的 `parallelism`。
+
+## Examples
+
+```bash
+source {
+    FakeSourceStream {
+      parallelism = 2
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+}
+
+transform {
+    Filter {
+      source_table_name = "fake"
+      fields = [name]
+      result_table_name = "fake_name"
+    }
+    Filter {
+      source_table_name = "fake"
+      fields = [age]
+      result_table_name = "fake_age"
+    }
+}
+
+sink {
+    Console {
+      source_table_name = "fake_name"
+    }
+    Console {
+      source_table_name = "fake_age"
+    }
+}
+```
+
+> 如果作业只有一个 source 和一个(或零个)transform 和一个 sink ,则不需要为连接器指定 `source_table_name` 和 
`result_table_name`。
+> 如果 source 、transform 和 sink 中任意运算符的数量大于 1,则必须为作业中的每个连接器指定 
`source_table_name` 和 `result_table_name`
+

Reply via email to