This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b4e072486 [Doc][Improve] Support chinese for sinks (#6663)
7b4e072486 is described below
commit 7b4e07248623d4f6da33fa4c8cfd103478fc77d8
Author: corgy-w <[email protected]>
AuthorDate: Tue Apr 30 11:17:11 2024 +0800
[Doc][Improve] Support chinese for sinks (#6663)
---
docs/en/connector-v2/sink/Email.md | 2 +-
docs/en/connector-v2/sink/Jdbc.md | 8 +
docs/zh/connector-v2/formats/avro.md | 111 +++++++
docs/zh/connector-v2/formats/canal-json.md | 115 +++++++
.../formats/cdc-compatible-debezium-json.md | 55 ++++
docs/zh/connector-v2/formats/debezium-json.md | 115 +++++++
.../formats/kafka-compatible-kafkaconnect-json.md | 47 +++
docs/zh/connector-v2/formats/ogg-json.md | 93 ++++++
docs/zh/connector-v2/sink/Console.md | 124 +++++++
docs/zh/connector-v2/sink/Elasticsearch.md | 218 +++++++++++++
docs/zh/connector-v2/sink/Email.md | 89 +++++
docs/zh/connector-v2/sink/Feishu.md | 14 +-
docs/zh/connector-v2/sink/HdfsFile.md | 6 +-
docs/zh/connector-v2/sink/Http.md | 63 ++++
docs/zh/connector-v2/sink/Jdbc.md | 357 +++++++++++++++++++++
docs/zh/connector-v2/sink/Kafka.md | 196 +++++++++++
docs/zh/connector-v2/sink/common-options.md | 58 ++++
17 files changed, 1661 insertions(+), 10 deletions(-)
diff --git a/docs/en/connector-v2/sink/Email.md
b/docs/en/connector-v2/sink/Email.md
index 4789884ca3..f2bca2783d 100644
--- a/docs/en/connector-v2/sink/Email.md
+++ b/docs/en/connector-v2/sink/Email.md
@@ -1,6 +1,6 @@
# Email
-> Email source connector
+> Email sink connector
## Description
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index c2591761ec..0153b9f92b 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -254,6 +254,8 @@ jdbc {
Exactly-once
+Turn on exact one-time semantics by setting `is_exactly_once`
+
```
jdbc {
@@ -273,6 +275,8 @@ jdbc {
CDC(Change data capture) event
+jdbc receive CDC example
+
```
sink {
jdbc {
@@ -290,6 +294,8 @@ sink {
Add saveMode function
+To facilitate the creation of tables when they do not already exist, set the
`schema_save_mode` to `CREATE_SCHEMA_WHEN_NOT_EXIST`.
+
```
sink {
jdbc {
@@ -309,6 +315,8 @@ sink {
Postgresql 9.5 version below support CDC(Change data capture) event
+For PostgreSQL versions 9.5 and below, setting `compatible_mode` to
`postgresLow` to enable support for PostgreSQL Change Data Capture (CDC)
operations.
+
```
sink {
jdbc {
diff --git a/docs/zh/connector-v2/formats/avro.md
b/docs/zh/connector-v2/formats/avro.md
new file mode 100644
index 0000000000..4e19ea4b98
--- /dev/null
+++ b/docs/zh/connector-v2/formats/avro.md
@@ -0,0 +1,111 @@
+# Avro 格式
+
+Avro 在流式数据处理管道中非常流行。现在seatunnel在kafka连接器中支持Avro格式
+
+# 怎样用
+
+## Kafka 使用示例
+
+- 模拟随机生成数据源,并以 Avro 的格式 写入 Kafka 的实例
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 90
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic_fake_source"
+ format = avro
+ }
+}
+```
+
+- 从 kafka 读取 avro 格式的数据并打印到控制台的示例
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ format = avro
+ format_error_handle_way = skip
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "kafka_table"
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/canal-json.md
b/docs/zh/connector-v2/formats/canal-json.md
new file mode 100644
index 0000000000..92c4338eb5
--- /dev/null
+++ b/docs/zh/connector-v2/formats/canal-json.md
@@ -0,0 +1,115 @@
+# Canal 格式
+
+变更数据捕获格式:
+序列化模式、反序列化模式
+
+Canal是一款CDC(变更数据捕获)工具,能够实时捕获MySQL的数据变化并将其流式传输到其他系统中。Canal为变更日志提供了一种统一的格式,并支持使用
JSON 和 protobuf(Canal默认使用protobuf)进行消息的序列化
+
+SeaTunnel 能够解析 Canal 的 JSON 消息,并将其转化为 INSERT/UPDATE/DELETE 消息,进而输入到 SeaTunnel
系统中。这个特性在很多场景下都显得非常有用,例如:
+
+ 将增量数据从数据库同步到其他系统
+ 审计日志
+ 数据库的实时物化视图
+ 关联维度数据库的变更历史,等等。
+
+SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息编码为 Canal JSON 消息,并将其发送到类似
Kafka 这样的存储中。然而,目前 SeaTunnel 无法将 UPDATE_BEFORE 和 UPDATE_AFTER
合并为一个单一的UPDATE消息。因此,SeaTunnel将 UPDATE_BEFORE 和 UPDATE_AFTER 编码为 Canal的 DELETE 和
INSERT 消息来进行
+
+# 格式选项
+
+| 选项 | 默认值 | 是否需要 |
描述 |
+|--------------------------------|--------|------|------------------------------------------------------------------------------------|
+| format | (none) | 是 | 指定要使用的格式,这里应该是 `canal_json`
|
+| canal_json.ignore-parse-errors | false | 否 |
跳过解析错误的字段和行,而不是失败。出现错误的字段将被设置为null
|
+| canal_json.database.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal
记录中的`database`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容 |
+| canal_json.table.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal
记录中的`table`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容 |
+
+# 如何使用
+
+## Kafka 使用示例
+
+Canal为变更日志提供了一种统一的格式,以下是一个从MySQL products 表捕获的变更操作的简单示例
+
+```bash
+{
+ "data": [
+ {
+ "id": "111",
+ "name": "scooter",
+ "description": "Big 2-wheel scooter",
+ "weight": "5.18"
+ }
+ ],
+ "database": "inventory",
+ "es": 1589373560000,
+ "id": 9,
+ "isDdl": false,
+ "mysqlType": {
+ "id": "INTEGER",
+ "name": "VARCHAR(255)",
+ "description": "VARCHAR(512)",
+ "weight": "FLOAT"
+ },
+ "old": [
+ {
+ "weight": "5.15"
+ }
+ ],
+ "pkNames": [
+ "id"
+ ],
+ "sql": "",
+ "sqlType": {
+ "id": 4,
+ "name": 12,
+ "description": 12,
+ "weight": 7
+ },
+ "table": "products",
+ "ts": 1589373560798,
+ "type": "UPDATE"
+}
+```
+
+注:请参考 [Canal 文档](https://github.com/alibaba/canal/wiki) 以了解每个字段的含义
+
+MySQL 的 products 表有 4 列(id、name、description 和 weight)
+上述 JSON 消息是产品表的一个更新变更事件,其中 id = 111 的行的 weight 值从 5.15 变为 5.18
+假设此表的 binlog 的消息已经同步到 Kafka topic,那么我们可以使用下面的 SeaTunnel 示例来消费这个主题并体现变更事件
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "products_binlog"
+ result_table_name = "kafka_name"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = canal_json
+ }
+
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "localhost:9092"
+ topic = "consume-binlog"
+ format = canal_json
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
new file mode 100644
index 0000000000..e34a5b39a2
--- /dev/null
+++ b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -0,0 +1,55 @@
+# CDC 兼容 Debezium-json
+
+SeaTunnel 支持将 cdc 记录解析为 Debezium-JSON 消息,并发布到 MQ (kafka) 等消息系统中
+
+这个特性在很多场景下都非常实用,例如,它可以实现与 Debezium 生态系统的兼容性
+
+# 如何使用
+
+## MySQL-CDC 流入 Kafka
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 15000
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "table1"
+
+ base-url="jdbc:mysql://localhost:3306/test"
+ "startup.mode"=INITIAL
+ table-names=[
+ "database1.t1",
+ "database1.t2",
+ "database2.t1"
+ ]
+
+ # compatible_debezium_json options
+ format = compatible_debezium_json
+ debezium = {
+ # include schema into kafka message
+ key.converter.schemas.enable = false
+ value.converter.schemas.enable = false
+ # include ddl
+ include.schema.changes = true
+ # topic prefix
+ database.server.name = "mysql_cdc_1"
+ }
+ }
+}
+
+sink {
+ Kafka {
+ source_table_name = "table1"
+
+ bootstrap.servers = "localhost:9092"
+
+ # compatible_debezium_json options
+ format = compatible_debezium_json
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/debezium-json.md
b/docs/zh/connector-v2/formats/debezium-json.md
new file mode 100644
index 0000000000..9fef845449
--- /dev/null
+++ b/docs/zh/connector-v2/formats/debezium-json.md
@@ -0,0 +1,115 @@
+# Debezium 格式
+
+变更数据捕获格式:
+序列化模式、反序列化模式
+
+Debezium 是一套分布式服务,用于捕获数据库中的变化,以便您的应用程序可以看到这些变化并对其做出响应。Debezium
在变更事件流中记录每个数据库表中的所有行级变化,应用程序只需读取这些流,就可以按照它们发生的顺序看到变更事件
+
+SeaTunnel 支持将 Debezium JSON 消息解析为 INSERT/UPDATE/DELETE 消息并导入到 seatunnel
系统中。在许多情况下,利用这个特性是非常有用的,例如:
+
+ 将增量数据从数据库同步到其他系统
+ 审计日志
+ 数据库的实时物化视图
+ 关联维度数据库的变更历史,等等。
+
+SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息解析为 Debezium JSON
消息,并将其发送到类似 Kafka 这样的存储中
+
+# 格式选项
+
+| 选项 | 默认值 | 是否需要 | 描述
|
+|-----------------------------------|--------|------|--------------------------------------|
+| format | (none) | 是 | 指定要使用的格式,这里应该是
'debezium_json'. |
+| debezium-json.ignore-parse-errors | false | 否 |
跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为 null |
+
+# 如何使用
+
+## Kafka 使用示例
+
+Debezium 提供了一个统一的变更日志格式,下面是一个 MySQL products 表捕获的变更操作的简单示例
+
+```bash
+{
+ "before": {
+ "id": 111,
+ "name": "scooter",
+ "description": "Big 2-wheel scooter ",
+ "weight": 5.18
+ },
+ "after": {
+ "id": 111,
+ "name": "scooter",
+ "description": "Big 2-wheel scooter ",
+ "weight": 5.17
+ },
+ "source": {
+ "version": "1.1.1.Final",
+ "connector": "mysql",
+ "name": "dbserver1",
+ "ts_ms": 1589362330000,
+ "snapshot": "false",
+ "db": "inventory",
+ "table": "products",
+ "server_id": 223344,
+ "gtid": null,
+ "file": "mysql-bin.000003",
+ "pos": 2090,
+ "row": 0,
+ "thread": 2,
+ "query": null
+ },
+ "op": "u",
+ "ts_ms": 1589362330904,
+ "transaction": null
+}
+```
+
+注:请参考 [Debezium
文档](https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-events)
以了解每个字段的含义
+
+MySQL 的 products 表有 4 列(id、name、description 和 weight)
+上述 JSON 消息是产品表的一个更新变更事件,其中 id = 111 的行的 weight 值从 5.18 变为 5.17
+假设消息已经同步到 Kafka 主题 products_binlog,那么我们可以使用以下的 SeaTunnel 配置来消费这个主题并通过 Debezium
格式解释变更事件。
+
+在此配置中,您必须指定 `schema` 和 `debezium_record_include_schema` 选项:
+- `schema` 应与您的表格式相同
+- 如果您的 json 数据包含 `schema` 字段,`debezium_record_include_schema` 应为 true,如果您的
json 数据不包含 `schema` 字段,`debezium_record_include_schema` 应为 false
+- `{"schema" : {}, "payload": { "before" : {}, "after": {} ... } }` --> `true`
+- `{"before" : {}, "after": {} ... }` --> `false`"
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "products_binlog"
+ result_table_name = "kafka_name"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ }
+ debezium_record_include_schema = false
+ format = debezium_json
+ }
+
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "consume-binlog"
+ format = debezium_json
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
b/docs/zh/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
new file mode 100644
index 0000000000..d0ceb58ac6
--- /dev/null
+++ b/docs/zh/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
@@ -0,0 +1,47 @@
+# Kafka source 兼容 kafka-connect-json
+
+Seatunnel 的 Kafka 连接器支持解析通过 Kafka Connect Source 抽取的数据,特别是从 Kafka Connect JDBC
和 Kafka Connect Debezium 抽取的数据
+
+# 如何使用
+
+## Kafka 流入 Mysql
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "localhost:9092"
+ topic = "jdbc_source_record"
+ result_table_name = "kafka_table"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = COMPATIBLE_KAFKA_CONNECT_JSON
+ }
+}
+
+
+sink {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://localhost:3306/seatunnel"
+ user = st_user
+ password = seatunnel
+ generate_sink_sql = true
+ database = seatunnel
+ table = jdbc_sink
+ primary_keys = ["id"]
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/formats/ogg-json.md
b/docs/zh/connector-v2/formats/ogg-json.md
new file mode 100644
index 0000000000..ef2753b114
--- /dev/null
+++ b/docs/zh/connector-v2/formats/ogg-json.md
@@ -0,0 +1,93 @@
+# Ogg 格式
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a
ogg)
是一项托管服务,提供实时数据网格平台,该平台使用复制来保持数据高度可用,并支持实时分析。客户可以设计、执行和监控其数据复制和流数据处理解决方案,而无需分配或管理计算环境。
Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
+
+SeaTunnel 支持将 Ogg JSON 消息解释为 Seatunnel 系统中的 INSERT/UPDATE/DELETE
消息。在许多情况下,这个特性带来了很多便利,例如
+
+ 将增量数据从数据库同步到其他系统
+ 审计日志
+ 数据库的实时物化视图
+ 关联维度数据库的变更历史,等等。
+
+SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息转化为 Ogg JSON 消息,并将其发送到类似
Kafka 这样的存储中。然而,目前 SeaTunnel 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE
消息。因此,Seatunnel 将 UPDATE_BEFORE 和 UPDATE_AFTER 转化为 DELETE 和 INSERT Ogg 消息来实现
+
+# 格式选项
+
+| 选项 | 默认值 | 是否需要 |
描述 |
+|------------------------------|--------|------|------------------------------------------------------------------------------------|
+| format | (none) | 是 | 指定要使用的格式,这里应该是`-json`
|
+| ogg_json.ignore-parse-errors | false | 否 |
跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为 null
|
+| ogg_json.database.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal
记录中的`database`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容 |
+| ogg_json.table.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal 记录中的
`table` 元字段来仅读取特定表的更改日志行。此字符串Pattern模式与Java的Pattern兼容 |
+
+# 如何使用 Ogg 格式
+
+## Kafka 使用示例
+
+Ogg 为变更日志提供了统一的格式,下面是从 Oracle PRODUCTS 表捕获变更操作的简单示例:
+
+```bash
+{
+ "before": {
+ "id": 111,
+ "name": "scooter",
+ "description": "Big 2-wheel scooter",
+ "weight": 5.18
+ },
+ "after": {
+ "id": 111,
+ "name": "scooter",
+ "description": "Big 2-wheel scooter",
+ "weight": 5.15
+ },
+ "op_type": "U",
+ "op_ts": "2020-05-13 15:40:06.000000",
+ "current_ts": "2020-05-13 15:40:07.000000",
+ "primary_keys": [
+ "id"
+ ],
+ "pos": "00000000000000000000143",
+ "table": "PRODUCTS"
+}
+```
+
+注:各字段含义请参考 [Debezium
文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events)注:各字段含义请参考
[Debezium
文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events)
+
+此 Oracle PRODUCTS 表有 4 列 (id, name, description 和 weight)
+上面的 JSON 消息是 products 表上的更新更改事件,其中 id = 111 的行的字段 `weight` 的值从 5.18 更改为 5.15。
+假设此表的 binlog 的消息已经同步到 Kafka topic,那么我们可以使用下面的 SeaTunnel 示例来消费这个 topic 并体现变更事件。
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+source {
+ Kafka {
+ bootstrap.servers = "127.0.0.1:9092"
+ topic = "ogg"
+ result_table_name = "kafka_name"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "double"
+ }
+ },
+ format = ogg_json
+ }
+}
+sink {
+ jdbc {
+ url = "jdbc:mysql://127.0.0.1/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "12345678"
+ table = "ogg"
+ primary_keys = ["id"]
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/sink/Console.md
b/docs/zh/connector-v2/sink/Console.md
new file mode 100644
index 0000000000..43dff33513
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Console.md
@@ -0,0 +1,124 @@
+# Console
+
+> Console 数据接收器
+
+## 支持连接器版本
+
+- 所有版本
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 描述
+
+接收Source端传入的数据并打印到控制台。支持批同步和流同步两种模式。
+
+> 例如,来自上游的数据为 [`age: 12, name: jared`] ,则发送到控制台的内容为:
`{"name":"jared","age":17}`
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+
+## 接收器选项
+
+| 名称 | 类型 | 是否必须 | 默认值 | 描述
|
+|--------------------|---------|------|-----|---------------------------------------------------|
+| common-options | | 否 | - | Sink插件常用参数,请参考
[Sink常用选项](common-options.md) 了解详情 |
+| log.print.data | boolean | 否 | - | 确定是否应在日志中打印数据的标志。默认值为`true`
|
+| log.print.delay.ms | int | 否 | - | 将每个数据项打印到日志之间的延迟(以毫秒为单位)。默认值为`0`
|
+
+## 任务示例
+
+### 简单示例:
+
+> 随机生成的数据,包含两个字段,即 `name`(字符串类型)和 `age`(整型),写入控制台,并行度为 `1`
+
+```
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake"
+ }
+}
+```
+
+### 多数据源示例:
+
+> 多数据源示例,通过配置可以指定数据源写入指定接收器
+
+```
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake1"
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ sex = "string"
+ }
+ }
+ }
+ FakeSource {
+ result_table_name = "fake2"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Console {
+ source_table_name = "fake2"
+ }
+}
+```
+
+## 控制台示例数据
+
+控制台打印的输出:
+
+```
+2022-12-19 11:01:45,417 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
output rowType: name<STRING>, age<INT>
+2022-12-19 11:01:46,489 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
CpiOd, 8520946
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
eQqTs, 1256802974
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
UsRgO, 2053193072
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
jDQJj, 1993016602
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
rqdKp, 1392682764
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
wCoWN, 986999925
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
qomTU, 72775247
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
jcqXR, 1074529204
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
AkWIO, 1961723427
+2022-12-19 11:01:46,490 INFO
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter -
subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT:
hBoib, 929089763
+```
+
diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md
b/docs/zh/connector-v2/sink/Elasticsearch.md
new file mode 100644
index 0000000000..edf974d8fb
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Elasticsearch.md
@@ -0,0 +1,218 @@
+# Elasticsearch
+
+## 描述
+
+输出数据到 `Elasticsearch`
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+:::tip
+
+引擎支持
+
+* 支持 `ElasticSearch 版本 >= 2.x 并且 <= 8.x`
+
+:::
+
+## 选项
+
+| 名称 | 类型 | 是否必须 | 默认值 |
+|-------------------------|---------|------|------------------------------|
+| hosts | array | 是 | - |
+| index | string | 是 | - |
+| schema_save_mode | string | 是 | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode | string | 是 | APPEND_DATA |
+| index_type | string | 否 | |
+| primary_keys | list | 否 | |
+| key_delimiter | string | 否 | `_` |
+| username | string | 否 | |
+| password | string | 否 | |
+| max_retry_count | int | 否 | 3 |
+| max_batch_size | int | 否 | 10 |
+| tls_verify_certificate | boolean | 否 | true |
+| tls_verify_hostnames | boolean | 否 | true |
+| tls_keystore_path | string | 否 | - |
+| tls_keystore_password | string | 否 | - |
+| tls_truststore_path | string | 否 | - |
+| tls_truststore_password | string | 否 | - |
+| common-options | | 否 | - |
+
+### hosts [array]
+
+`Elasticsearch` 集群http地址,格式为 `host:port` ,允许指定多个主机。例如 `["host1:9200",
"host2:9200"]`
+
+### index [string]
+
+`Elasticsearch` 的 `index` 名称。索引支持包含字段名变量,例如 `seatunnel_${age}`,并且该字段必须出现在
seatunnel Row 中。如果没有,我们将把它视为普通索引
+
+### index_type [string]
+
+`Elasticsearch` 索引类型,elasticsearch 6及以上版本建议不要指定
+
+### primary_keys [list]
+
+主键字段用于生成文档 `_id` ,这是 CDC 必需的选项。
+
+### key_delimiter [string]
+
+设定复合键的分隔符(默认为 `_`),例如,如果使用 `$` 作为分隔符,那么文档的 `_id` 将呈现为 `KEY1$KEY2$KEY3` 的格式
+
+### username [string]
+
+x-pack 用户名
+
+### password [string]
+
+x-pack 密码
+
+### max_retry_count [int]
+
+批次批量请求最大尝试大小
+
+### max_batch_size [int]
+
+批次批量文档最大大小
+
+### tls_verify_certificate [boolean]
+
+为 HTTPS 端点启用证书验证
+
+### tls_verify_hostname [boolean]
+
+为 HTTPS 端点启用主机名验证
+
+### tls_keystore_path [string]
+
+指向 PEM 或 JKS 密钥存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件
+
+### tls_keystore_password [string]
+
+指定的密钥存储的密钥密码
+
+### tls_truststore_path [string]
+
+指向 PEM 或 JKS 信任存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件
+
+### tls_truststore_password [string]
+
+指定的信任存储的密钥密码
+
+### common options
+
+Sink插件常用参数,请参考 [Sink常用选项](common-options.md) 了解详情
+
+### schema_save_mode
+
+在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案<br/>
+选项介绍:<br/>
+`RECREATE_SCHEMA` :当表不存在时会创建,当表已存在时会删除并重建<br/>
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :当表不存在时会创建,当表已存在时则跳过创建<br/>
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :当表不存在时将抛出错误<br/>
+
+### data_save_mode
+
+在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案<br/>
+选项介绍:<br/>
+`DROP_DATA`: 保留数据库结构,删除数据<br/>
+`APPEND_DATA`:保留数据库结构,保留数据<br/>
+`ERROR_WHEN_DATA_EXISTS`:当有数据时抛出错误<br/>
+
+## 示例
+
+简单示例
+
+```bash
+sink {
+ Elasticsearch {
+ hosts = ["localhost:9200"]
+ index = "seatunnel-${age}"
+ }
+}
+```
+
+变更数据捕获 (Change data capture) 事件
+
+```bash
+sink {
+ Elasticsearch {
+ hosts = ["localhost:9200"]
+ index = "seatunnel-${age}"
+
+ # CDC required options
+ primary_keys = ["key1", "key2", ...]
+ }
+}
+```
+
+SSL 禁用证书验证
+
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+
+ tls_verify_certificate = false
+ }
+}
+```
+
+SSL 禁用主机名验证
+
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+
+ tls_verify_hostname = false
+ }
+}
+```
+
+SSL 启用证书验证
+
+通过设置 `tls_keystore_path` 与 `tls_keystore_password` 指定证书路径及密码
+
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+
+ tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
+ tls_keystore_password = "${your password}"
+ }
+}
+```
+
+配置表生成策略 (schema_save_mode)
+
+通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
+
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
+
+## 变更日志
+
+### 下一版本
+
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3673](https://github.com/apache/seatunnel/pull/3673))
+- [Feature] Support https protocol & compatible with opensearch
([3997](https://github.com/apache/seatunnel/pull/3997))
+
diff --git a/docs/zh/connector-v2/sink/Email.md
b/docs/zh/connector-v2/sink/Email.md
new file mode 100644
index 0000000000..cc3999c580
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Email.md
@@ -0,0 +1,89 @@
+# Email
+
+> Email 数据接收器
+
+## 描述
+
+将接收的数据作为文件发送到电子邮件
+
+## 支持版本
+
+测试版本:1.5.6(供参考)
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+
+## 选项
+
+| 名称 | 类型 | 是否必须 | 默认值 |
+|--------------------------|--------|------|-----|
+| email_from_address | string | 是 | - |
+| email_to_address | string | 是 | - |
+| email_host | string | 是 | - |
+| email_transport_protocol | string | 是 | - |
+| email_smtp_auth | string | 是 | - |
+| email_authorization_code | string | 是 | - |
+| email_message_headline | string | 是 | - |
+| email_message_content | string | 是 | - |
+| common-options | | 否 | - |
+
+### email_from_address [string]
+
+发件人邮箱地址
+
+### email_to_address [string]
+
+接收邮件的地址
+
+### email_host [string]
+
+连接的SMTP服务器地址
+
+### email_transport_protocol [string]
+
+加载会话的协议
+
+### email_smtp_auth [string]
+
+是否对客户进行认证
+
+### email_authorization_code [string]
+
+授权码,您可以从邮箱设置中获取授权码
+
+### email_message_headline [string]
+
+邮件的标题
+
+### email_message_content [string]
+
+邮件消息的正文
+
+### common options
+
+Sink插件常用参数,请参考 [Sink常用选项](common-options.md) 了解详情.
+
+## 示例
+
+```bash
+
+ EmailSink {
+ email_from_address = "[email protected]"
+ email_to_address = "[email protected]"
+ email_host="smtp.qq.com"
+ email_transport_protocol="smtp"
+ email_smtp_auth="true"
+ email_authorization_code=""
+ email_message_headline=""
+ email_message_content=""
+ }
+
+```
+
+## 变更日志
+
+### 2.2.0-beta 2022-09-26
+
+- 添加 Email 接收器连接器
+
diff --git a/docs/zh/connector-v2/sink/Feishu.md
b/docs/zh/connector-v2/sink/Feishu.md
index 01cdc7a482..c561e50a97 100644
--- a/docs/zh/connector-v2/sink/Feishu.md
+++ b/docs/zh/connector-v2/sink/Feishu.md
@@ -10,8 +10,8 @@
## 主要特性
-- [ ] [精确一次](../../../en/concept/connector-v2-features.md)
-- [ ] [变更数据捕获](../../../en/concept/connector-v2-features.md)
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [变更数据捕获](../../concept/connector-v2-features.md)
## 描述
@@ -42,11 +42,11 @@
## 接收器选项
-| 名称 | 类型 | 是否必需 | 默认值 |
描述 |
-|----------------|--------|------|-----|------------------------------------------------------------------------------------|
-| url | String | 是 | - | 飞书web hook URL
|
-| headers | Map | 否 | - | HTTP 请求头
|
-| common-options | | 否 | - | 接收器插件常见参数,请参阅
[接收器通用选项](../../../en/connector-v2/source/common-options.md) 以获取详细信息 |
+| 名称 | 类型 | 是否必需 | 默认值 | 描述
|
+|----------------|--------|------|-----|----------------------------------------------------|
+| url | String | 是 | - | 飞书web hook URL
|
+| headers | Map | 否 | - | HTTP 请求头
|
+| common-options | | 否 | - | 接收器插件常见参数,请参阅
[接收器通用选项](common-options.md) 以获取详细信息 |
## 任务示例
diff --git a/docs/zh/connector-v2/sink/HdfsFile.md
b/docs/zh/connector-v2/sink/HdfsFile.md
index 1ed6ea2d56..dee466770e 100644
--- a/docs/zh/connector-v2/sink/HdfsFile.md
+++ b/docs/zh/connector-v2/sink/HdfsFile.md
@@ -60,13 +60,15 @@
| kerberos_principal | string | 否 | -
| kerberos 的主体
|
| kerberos_keytab_path | string | 否 | -
| kerberos 的 keytab 路径
|
| compress_codec | string | 否 | none
| 压缩编解码器
|
-| common-options | object | 否 | -
| 接收器插件通用参数,请参阅
[接收器通用选项](../../../en/connector-v2/source/common-options.md) 了解详情
|
+| common-options | object | 否 | -
| 接收器插件通用参数,请参阅 [接收器通用选项](common-options.md) 了解详情
|
| max_rows_in_memory | int | 否 | -
| 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。
|
| sheet_name | string | 否 | Sheet${Random number}
| 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名
|
### 提示
-> 如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop
版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查
`${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。
+> 如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是
+> 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop
+> jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。
## 任务示例
diff --git a/docs/zh/connector-v2/sink/Http.md
b/docs/zh/connector-v2/sink/Http.md
new file mode 100644
index 0000000000..f837380efd
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Http.md
@@ -0,0 +1,63 @@
+# Http
+
+> Http 数据接收器
+
+## 支持引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## 描述
+
+接收Source端传入的数据,利用数据触发 web hooks。
+
+> 例如,来自上游的数据为[`age: 12, name: tyrantlucifer`],则body内容如下:`{"age": 12, "name":
"tyrantlucifer"}`
+
+**Tips: Http 接收器仅支持 `post json` 类型的 web hook,source 数据将被视为 webhook 中的 body
内容。**
+
+## 支持的数据源信息
+
+想使用 Http 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖
+
+| 数据源 | 支持版本 | 依赖
|
+|------|------|------------------------------------------------------------------------------------------------------|
+| Http | 通用 |
[下载](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http)
|
+
+## 接收器选项
+
+| 名称 | 类型 | 是否必须 | 默认值 |
描述 |
+|-----------------------------|--------|------|-------|----------------------------------------------------|
+| url | String | 是 | - | Http 请求链接
|
+| headers | Map | 否 | - | Http 标头
|
+| retry | Int | 否 | - |
如果请求http返回`IOException`的最大重试次数 |
+| retry_backoff_multiplier_ms | Int | 否 | 100 | http请求失败,重试回退次数(毫秒)乘数
|
+| retry_backoff_max_ms | Int | 否 | 10000 | http请求失败,最大重试回退时间(毫秒)
|
+| connect_timeout_ms | Int | 否 | 12000 | 连接超时设置,默认12s
|
+| socket_timeout_ms | Int | 否 | 60000 | 套接字超时设置,默认为60s
|
+| common-options | | 否 | - | Sink插件常用参数,请参考
[Sink常用选项 ](common-options.md) 了解详情 |
+
+## 示例
+
+简单示例:
+
+```hocon
+Http {
+ url = "http://localhost/test/webhook"
+ headers {
+ token = "9e32e859ef044462a257e1fc76730066"
+ }
+}
+```
+
+## 变更日志
+
+### 2.2.0-beta 2022-09-26
+
+- 添加Http接收连接器
+
diff --git a/docs/zh/connector-v2/sink/Jdbc.md
b/docs/zh/connector-v2/sink/Jdbc.md
new file mode 100644
index 0000000000..e9c2665b75
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -0,0 +1,357 @@
+# JDBC
+
+> JDBC 数据接收器
+
+## 描述
+
+通过jdbc写入数据。支持批处理模式和流处理模式,支持并发写入,支持精确一次语义(使用XA事务保证)
+
+## 使用依赖
+
+### 用于Spark/Flink引擎
+
+> 1. 需要确保jdbc驱动jar包已经放在目录`${SEATUNNEL_HOME}/plugins/`下。
+
+### 适用于 SeaTunnel Zeta 引擎
+
+> 1. 需要确保jdbc驱动jar包已经放到`${SEATUNNEL_HOME}/lib/`目录下。
+
+## 主要特性
+
+- [x] [精确一次](../../concept/connector-v2-features.md)
+
+使用 `Xa transactions` 来确保 `exactly-once`。所以仅对于支持 `Xa transactions` 的数据库支持
`exactly-once`
+。你可以设置 `is_exactly_once=true` 来启用它。
+
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## Options
+
+| 名称 | 类型 | 是否必须 | 默认值
|
+|-------------------------------------------|---------|------|------------------------------|
+| url | String | 是 | -
|
+| driver | String | 是 | -
|
+| user | String | 否 | -
|
+| password | String | 否 | -
|
+| query | String | 否 | -
|
+| compatible_mode | String | 否 | -
|
+| database | String | 否 | -
|
+| table | String | 否 | -
|
+| primary_keys | Array | 否 | -
|
+| support_upsert_by_query_primary_key_exist | Boolean | 否 | false
|
+| connection_check_timeout_sec | Int | 否 | 30
|
+| max_retries | Int | 否 | 0
|
+| batch_size | Int | 否 | 1000
|
+| is_exactly_once | Boolean | 否 | false
|
+| generate_sink_sql | Boolean | 否 | false
|
+| xa_data_source_class_name | String | 否 | -
|
+| max_commit_attempts | Int | 否 | 3
|
+| transaction_timeout_sec | Int | 否 | -1
|
+| auto_commit | Boolean | 否 | true
|
+| field_ide | String | 否 | -
|
+| properties | Map | 否 | -
|
+| common-options | | 否 | -
|
+| schema_save_mode | Enum | 否 |
CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode | Enum | 否 | APPEND_DATA
|
+| custom_sql | String | 否 | -
|
+| enable_upsert | Boolean | 否 | true
|
+| use_copy_statement | Boolean | 否 | false
|
+
+### driver [string]
+
+用于连接远程数据源的 jdbc 类名,如果使用MySQL,则值为`com.mysql.cj.jdbc.Driver`
+
+### user [string]
+
+用户名
+
+### password [string]
+
+密码
+
+### url [string]
+
+JDBC 连接的 URL。参考案例:`jdbc:postgresql://localhost/test`
+
+### query [string]
+
+使用 sql 语句将上游输入数据写入到数据库。如 `INSERT ...`
+
+### compatible_mode [string]
+
+数据库的兼容模式,当数据库支持多种兼容模式时需要。例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 。
+
+Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC
+
+### database [string]
+
+使用此 `database` 和 `table-name` 自动生成 SQL,并接收上游输入的数据写入数据库。
+
+此选项与 `query` 选项是互斥的,此选项具有更高的优先级。
+
+### table [string]
+
+使用 `database` 和此 `table-name` 自动生成 SQL,并接收上游输入的数据写入数据库。
+
+此选项与 `query` 选项是互斥的,此选项具有更高的优先级。
+
+table参数可以填入一个任意的表名,这个名字最终会被用作创建表的表名,并且支持变量(`${table_name}`,`${schema_name}`)。
+替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${table_name}` 将替换传递给目标端的表名。
+
+mysql 接收器示例:
+
+1. test_${schema_name}_${table_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+pgsql (Oracle Sqlserver ...) 接收器示例:
+
+1. ${schema_name}.${table_name}_test
+2. dbo.tt_${table_name}_sink
+3. public.sink_table
+
+Tip: 如果目标数据库有 SCHEMA 的概念,则表参数必须写成 `xxx.xxx`
+
+### primary_keys [array]
+
+该选项用于辅助生成 insert、delete、update 等 sql 语句。设置了该选项,将会根据该选项生成对应的 sql 语句
+
+### support_upsert_by_query_primary_key_exist [boolean]
+
+根据查询主键是否存在来选择使用 INSERT sql、UPDATE sql 来处理变更事件(INSERT、UPDATE_AFTER)。仅当数据库不支持
upsert 语法时才使用此配置
+**注意**:该方法性能较低
+
+### connection_check_timeout_sec [int]
+
+用于验证数据库连接的有效性时等待数据库操作完成所需的时间,单位是秒
+
+### max_retries[int]
+
+重试提交失败的最大次数(executeBatch)
+
+### batch_size[int]
+
+对于批量写入,当缓冲的记录数达到 `batch_size` 数量或者时间达到 `checkpoint.interval` 时,数据将被刷新到数据库中
+
+### is_exactly_once[boolean]
+
+是否启用通过XA事务实现的精确一次语义。开启,你还需要设置 `xa_data_source_class_name`
+
+### generate_sink_sql[boolean]
+
+根据要写入的数据库表结构生成 sql 语句
+
+### xa_data_source_class_name[string]
+
+指数据库驱动的 XA 数据源的类名。以 MySQL 为例,其类名为
com.mysql.cj.jdbc.MysqlXADataSource。了解其他数据库的数据源类名,可以参考文档的附录部分
+
+### max_commit_attempts[int]
+
+事务提交失败的最大重试次数
+
+### transaction_timeout_sec[int]
+
+在事务开启后的超时时间,默认值为-1(即永不超时)。请注意,设置超时时间可能会影响到精确一次(exactly-once)的语义
+
+### auto_commit [boolean]
+
+默认启用自动事务提交
+
+### field_ide [String]
+
+字段 `field_ide` 用于在从 source 同步到 sink 时,确定字段是否需要转换为大写或小写。'ORIGINAL'
表示不需要转换,'UPPERCASE' 表示转换为大写,'LOWERCASE' 表示转换为小写
+
+### properties
+
+附加连接配置参数,当属性和URL具有相同参数时,优先级由驱动程序的具体实现确定。例如,在 MySQL 中,属性配置优先于 URL。
+
+### common options
+
+Sink插件常用参数,请参考 [Sink常用选项](common-options.md) 了解详情
+
+### schema_save_mode [Enum]
+
+在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案<br/>
+选项介绍:<br/>
+`RECREATE_SCHEMA`:当表不存在时会创建,当表已存在时会删除并重建<br/>
+`CREATE_SCHEMA_WHEN_NOT_EXIST`:当表不存在时会创建,当表已存在时则跳过创建<br/>
+`ERROR_WHEN_SCHEMA_NOT_EXIST`:当表不存在时将抛出错误<br/>
+
+### data_save_mode [Enum]
+
+在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案<br/>
+选项介绍:<br/>
+`DROP_DATA`:保留数据库结构,删除数据<br/>
+`APPEND_DATA`:保留数据库结构,保留数据<br/>
+`CUSTOM_PROCESSING`:允许用户自定义数据处理方式<br/>
+`ERROR_WHEN_DATA_EXISTS`:当有数据时抛出错误<br/>
+
+### custom_sql [String]
+
+当`data_save_mode`选择`CUSTOM_PROCESSING`时,需要填写`CUSTOM_SQL`参数。该参数通常填写一条可以执行的SQL。SQL将在同步任务之前执行
+
+### enable_upsert [boolean]
+
+启用通过主键更新插入,如果任务没有key重复数据,设置该参数为 false 可以加快数据导入速度
+
+### use_copy_statement [boolean]
+
+使用 `COPY ${table} FROM STDIN` 语句导入数据。仅支持具有 `getCopyAPI()`
方法连接的驱动程序。例如:Postgresql
+驱动程序 `org.postgresql.Driver`
+
+注意:不支持 `MAP`、`ARRAY`、`ROW`类型
+
+## tips
+
+在 is_exactly_once = "true" 的情况下,使用 XA 事务。这需要数据库支持,有些数据库需要一些设置:<br/>
+1 postgres 需要设置 `max_prepared_transactions > 1` 例如 `ALTER SYSTEM set
max_prepared_transactions to 10` <br/>
+2 mysql 版本需要 >= `8.0.29` 并且非 root 用户需要授予 `XA_RECOVER_ADMIN` 权限。例如:将 test_db.*
上的 XA_RECOVER_ADMIN
+授予 `'user1'@'%'`<br/>
+3 mysql可以尝试在url中添加 `rewriteBatchedStatements=true` 参数以获得更好的性能<br/>
+
+## 附录
+
+附录参数仅提供参考
+
+| 数据源 | driver |
url |
xa_data_source_class_name |
maven |
+|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
+| DM | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
dm.jdbc.driver.DmdbXADataSource |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
+| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
+| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
+| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
+| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
|
+| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
+| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
+| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
+| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
+| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
+| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
+| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
|
+
+## 示例
+
+简单示例
+
+```
+jdbc {
+ url = "jdbc:mysql://localhost:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "123456"
+ query = "insert into test_table(name,age) values(?,?)"
+}
+
+```
+
+精确一次 (Exactly-once)
+
+通过设置 `is_exactly_once` 开启精确一次语义
+
+```
+jdbc {
+
+ url = "jdbc:mysql://localhost:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+
+ max_retries = 0
+ user = "root"
+ password = "123456"
+ query = "insert into test_table(name,age) values(?,?)"
+
+ is_exactly_once = "true"
+
+ xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+}
+```
+
+变更数据捕获 (Change data capture) 事件
+
+jdbc 接收 CDC 示例
+
+```
+sink {
+ jdbc {
+ url = "jdbc:mysql://localhost:3306"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "123456"
+
+ database = "sink_database"
+ table = "sink_table"
+ primary_keys = ["key1", "key2", ...]
+ }
+}
+```
+
+配置表生成策略 (schema_save_mode)
+
+通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
+
+```
+sink {
+ jdbc {
+ url = "jdbc:mysql://localhost:3306"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "123456"
+
+ database = "sink_database"
+ table = "sink_table"
+ primary_keys = ["key1", "key2", ...]
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
+ }
+}
+```
+
+支持Postgres 9.5及以下版本的 CDC 示例
+
+Postgres 9.5及以下版本,通过设置 `compatible_mode` 配置为 `postgresLow` 来支持 Postgres CDC 操作
+
+```
+sink {
+ jdbc {
+ url = "jdbc:postgresql://localhost:5432"
+ driver = "org.postgresql.Driver"
+ user = "root"
+ password = "123456"
+ compatible_mode="postgresLow"
+ database = "sink_database"
+ table = "sink_table"
+ support_upsert_by_query_primary_key_exist = true
+ generate_sink_sql = true
+ primary_keys = ["key1", "key2", ...]
+ }
+}
+
+```
+
+## 变更日志
+
+### 2.3.0-beta 2022-10-20
+
+- [BugFix] Fix JDBC split exception
([2904](https://github.com/apache/seatunnel/pull/2904))
+- [Feature] Support Phoenix JDBC Sink
([2499](https://github.com/apache/seatunnel/pull/2499))
+- [Feature] Support SQL Server JDBC Sink
([2646](https://github.com/apache/seatunnel/pull/2646))
+- [Feature] Support Oracle JDBC Sink
([2550](https://github.com/apache/seatunnel/pull/2550))
+- [Feature] Support StarRocks JDBC Sink
([3060](https://github.com/apache/seatunnel/pull/3060))
+- [Feature] Support DB2 JDBC Sink
([2410](https://github.com/apache/seatunnel/pull/2410))
+
+### next version
+
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3378](https://github.com/apache/seatunnel/issues/3378))
+- [Feature] Support Teradata JDBC Sink
([3362](https://github.com/apache/seatunnel/pull/3362))
+- [Feature] Support Sqlite JDBC Sink
([3089](https://github.com/apache/seatunnel/pull/3089))
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3378](https://github.com/apache/seatunnel/issues/3378))
+- [Feature] Support Doris JDBC Sink
+- [Feature] Support Redshift JDBC
Sink([#3615](https://github.com/apache/seatunnel/pull/3615))
+- [Improve] Add config item enable upsert by
query([#3708](https://github.com/apache/seatunnel/pull/3708))
+- [Improve] Add database field to sink
config([#4199](https://github.com/apache/seatunnel/pull/4199))
+- [Improve] Add Vertica
connector([#4303](https://github.com/apache/seatunnel/pull/4303))
+
diff --git a/docs/zh/connector-v2/sink/Kafka.md
b/docs/zh/connector-v2/sink/Kafka.md
new file mode 100644
index 0000000000..c0ce933870
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Kafka.md
@@ -0,0 +1,196 @@
+# Kafka
+
+> Kafka 数据接收器
+
+## 支持引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 主要特性
+
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+> 默认情况下,我们将使用 2pc 来保证消息只发送一次到kafka
+
+## 描述
+
+将 Rows 内容发送到 Kafka topic
+
+## 支持的数据源信息
+
+为了使用 Kafka 连接器,需要以下依赖项
+可以通过 install-plugin.sh 或从 Maven 中央存储库下载
+
+| 数据源 | 支持版本 | Maven
|
+|-------|------|-------------------------------------------------------------------------------------------------------|
+| Kafka | 通用 |
[下载](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka)
|
+
+## 接收器选项
+
+| 名称 | 类型 | 是否需要 | 默认值 |
描述
|
+|----------------------|--------|------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String | 是 | - | 当表用作接收器时,topic 名称是要写入数据的 topic
|
+| bootstrap.servers | String | 是 | - | Kafka brokers 使用逗号分隔
|
+| kafka.config | Map | 否 | - | 除了上述 Kafka Producer
客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖
[Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs)
|
+| semantics | String | 否 | NON | 可以选择的语义是
EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。
|
+| partition_key_fields | Array | 否 | - | 配置字段用作 kafka 消息的key
|
+| partition | Int | 否 | - | 可以指定分区,所有消息都会发送到此分区
|
+| assign_partitions | Array | 否 | - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息
|
+| transaction_prefix | String | 否 | - |
如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId
来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀
|
+| format | String | 否 | json |
数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 和 avro。如果使用 json
或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅
[debezium-json](../formats/debezium-json.md) 了解详细信息 |
+| field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符
|
+| common-options | | 否 | - | Sink插件常用参数,请参考 [Sink常用选项
](common-options.md) 了解详情
|
+
+## 参数解释
+
+### Topic 格式
+
+目前支持两种格式:
+
+1. 填写topic名称
+
+2. 使用上游数据中的字段值作为 topic ,格式是 `${your field name}`, 其中 topic 是上游数据的其中一列的值
+
+ 例如,上游数据如下:
+
+| name | age | data |
+|------|-----|---------------|
+| Jack | 16 | data-example1 |
+| Mary | 23 | data-example2 |
+
+如果 `${name}` 设置为 topic。因此,第一行发送到 Jack topic,第二行发送到 Mary topic。
+
+### 语义
+
+在 EXACTLY_ONCE 中,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给
Kafka,该模式下能保证数据精确写入kafka一次,即使任务失败重试也不会出现数据重复和丢失
+在 AT_LEAST_ONCE 中,生产者将等待 Kafka 缓冲区中所有未完成的消息在检查点上被 Kafka
生产者确认,该模式下能保证数据至少写入kafka一次,即使任务失败
+NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢失,并且消息可能会重复,该模式下,任务失败重试可能会产生数据丢失或重复。
+
+### 分区关键字段
+
+例如,如果你想使用上游数据中的字段值作为键,可以将这些字段名指定给此属性
+
+上游数据如下所示:
+
+| name | age | data |
+|------|-----|---------------|
+| Jack | 16 | data-example1 |
+| Mary | 23 | data-example2 |
+
+如果将 name 设置为 key,那么 name 列的哈希值将决定消息发送到哪个分区。
+如果没有设置分区键字段,则将发送空消息键。
+消息 key 的格式为 json,如果设置 name 为 key,例如 `{"name":"Jack"}`。
+所选的字段必须是上游数据中已存在的字段。
+
+### 分区分配
+
+假设总有五个分区,配置中的 assign_partitions 字段设置为:
+assign_partitions = ["shoe", "clothing"]
+在这种情况下,包含 "shoe" 的消息将被发送到第零个分区,因为 "shoe" 在 assign_partitions 中被标记为零, 而包含
"clothing" 的消息将被发送到第一个分区。
+对于其他的消息,我们将使用哈希算法将它们均匀地分配到剩余的分区中。
+这个功能是通过 MessageContentPartitioner 类实现的,该类实现了
org.apache.kafka.clients.producer.Partitioner 接口。如果我们需要自定义分区,我们需要实现这个接口。
+
+## 任务示例
+
+### 简单:
+
+> 此示例展示了如何定义一个 SeaTunnel 同步任务,该任务能够通过 FakeSource 自动产生数据并将其发送到 Kafka
Sink。在这个例子中,FakeSource 会生成总共 16 行数据(`row.num=16`),每一行都包含两个字段,即 `name`(字符串类型)和
`age`(整型)。最终,这些数据将被发送到名为 test_topic 的 topic 中,因此该 topic 也将包含 16 行数据。
+> 如果你还未安装和部署 SeaTunnel,你需要参照
[安装SeaTunnel](../../start-v2/locally/deployment.md) 的指南来进行安装和部署。完成安装和部署后,你可以按照
[快速开始使用 SeaTunnel 引擎](../../start-v2/locally/quick-start-seatunnel-engine.md)
的指南来运行任务。
+
+```hocon
+# Defining the runtime environment
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ parallelism = 1
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ kafka {
+ topic = "test_topic"
+ bootstrap.servers = "localhost:9092"
+ format = json
+ kafka.request.timeout.ms = 60000
+ semantics = EXACTLY_ONCE
+ kafka.config = {
+ acks = "all"
+ request.timeout.ms = 60000
+ buffer.memory = 33554432
+ }
+ }
+}
+```
+
+### AWS MSK SASL/SCRAM
+
+将以下 `${username}` 和 `${password}` 替换为 AWS MSK 中的配置值。
+
+```hocon
+sink {
+ kafka {
+ topic = "seatunnel"
+ bootstrap.servers = "localhost:9092"
+ format = json
+ kafka.request.timeout.ms = 60000
+ semantics = EXACTLY_ONCE
+ kafka.config = {
+ security.protocol=SASL_SSL
+ sasl.mechanism=SCRAM-SHA-512
+
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule
required \nusername=${username}\npassword=${password};"
+ }
+ }
+}
+```
+
+### AWS MSK IAM
+
+从 https://github.com/aws/aws-msk-iam-auth/releases 下载
`aws-msk-iam-auth-1.1.5.jar`
+并将其放入 `$SEATUNNEL_HOME/plugin/kafka/lib` 中目录。
+请确保 IAM 策略具有 `kafka-cluster:Connect`
+如下配置:
+
+```hocon
+"Effect": "Allow",
+"Action": [
+ "kafka-cluster:Connect",
+ "kafka-cluster:AlterCluster",
+ "kafka-cluster:DescribeCluster"
+],
+```
+
+接收器配置
+
+```hocon
+sink {
+ kafka {
+ topic = "seatunnel"
+ bootstrap.servers = "localhost:9092"
+ format = json
+ kafka.request.timeout.ms = 60000
+ semantics = EXACTLY_ONCE
+ kafka.config = {
+ security.protocol=SASL_SSL
+ sasl.mechanism=AWS_MSK_IAM
+ sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule
required;"
+
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
+ }
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/sink/common-options.md
b/docs/zh/connector-v2/sink/common-options.md
new file mode 100644
index 0000000000..8569b46da0
--- /dev/null
+++ b/docs/zh/connector-v2/sink/common-options.md
@@ -0,0 +1,58 @@
+# Sink 常用选项
+
+> Sink 连接器常用参数
+
+| 名称 | 类型 | 是否需要 | 默认值 |
+|-------------------|--------|------|-----|
+| source_table_name | string | 否 | - |
+| parallelism | int | 否 | - |
+
+### source_table_name [string]
+
+当不指定 `source_table_name` 时,当前插件处理配置文件中上一个插件输出的数据集 `dataset`
+
+当指定了 `source_table_name` 时,当前插件正在处理该参数对应的数据集
+
+### parallelism [int]
+
+当没有指定`parallelism`时,默认使用 env 中的 `parallelism`。
+
+当指定 `parallelism` 时,它将覆盖 env 中的 `parallelism`。
+
+## Examples
+
+```bash
+source {
+ FakeSourceStream {
+ parallelism = 2
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+}
+
+transform {
+ Filter {
+ source_table_name = "fake"
+ fields = [name]
+ result_table_name = "fake_name"
+ }
+ Filter {
+ source_table_name = "fake"
+ fields = [age]
+ result_table_name = "fake_age"
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake_name"
+ }
+ Console {
+ source_table_name = "fake_age"
+ }
+}
+```
+
+> 如果作业只有一个 source 和一个(或零个)transform 和一个 sink ,则不需要为连接器指定 `source_table_name` 和
`result_table_name`。
+> 如果 source 、transform 和 sink 中任意运算符的数量大于 1,则必须为作业中的每个连接器指定
`source_table_name` 和 `result_table_name`
+