EricJoy2048 commented on code in PR #6663: URL: https://github.com/apache/seatunnel/pull/6663#discussion_r1571846706
########## docs/zh/connector-v2/formats/ogg-json.md: ########## @@ -0,0 +1,93 @@ +# Ogg 格式 + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a ogg) 是一项托管服务,提供实时数据网格平台,该平台使用复制来保持数据高度可用,并支持实时分析。客户可以设计、执行和监控其数据复制和流数据处理解决方案,而无需分配或管理计算环境。 Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 + +SeaTunnel 支持将 Ogg JSON 消息解释为 Seatunnel 系统中的 INSERT/UPDATE/DELETE 消息。在许多情况下,这个特性带来了很多便利,例如 + + 将增量数据从数据库同步到其他系统 + 审计日志 + 数据库的实时物化视图 + 关联维度数据库的变更历史,等等。 + +SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息转化为 Ogg JSON 消息,并将其发送到类似 Kafka 这样的存储中。然而,目前 SeaTunnel 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 消息。因此,Seatunnel 将 UPDATE_BEFORE 和 UPDATE_AFTER 转化为 DELETE 和 INSERT Ogg 消息来实现 + +# 格式选项 + +| 选项 | 默认值 | 是否需要 | 描述 | +|------------------------------|--------|------|------------------------------------------------------------------------------------| +| format | (none) | 是 | 指定要使用的格式,这里应该是`-json` | +| ogg_json.ignore-parse-errors | false | 否 | 跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为 null | +| ogg_json.database.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal 记录中的`database`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容 | +| ogg_json.table.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal 记录中的 `table` 元字段来仅读取特定表的更改日志行。此字符串Pattern模式与Java的Pattern兼容 | + +# 如何使用 Ogg 格式 + +## Kafka 使用示例 + +Ogg 为变更日志提供了统一的格式,下面是从 Oracle PRODUCTS 表捕获变更操作的简单示例: + +```bash +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +注:各字段含义请参考 [Debezium 文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events)注:各字段含义请参考 [Debezium 文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events) + +此 Oracle PRODUCTS 表有 4 列 (id, name, description 和 weight) +上面的 JSON 消息是 products 表上的更新更改事件,其中 id = 111 的行的权重值从 5.18 更改为 5.15。 Review Comment: ```suggestion 上面的 JSON 消息是 products 表上的更新更改事件,其中 id = 111 的行的字段`weight`的值从 5.18 更改为 5.15。 ``` ########## docs/zh/connector-v2/sink/Elasticsearch.md: ########## @@ -0,0 +1,218 @@ +# Elasticsearch + +## 描述 + +输出数据到 `Elasticsearch` + +## 主要特性 + +- [ ] [精确一次](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +:::tip + +引擎支持 + +* 支持 `ElasticSearch 版本 >= 2.x 并且 <= 8.x` + +::: + +## 选项 + +| 名称 | 类型 | 是否必须 | 默认值 | +|-------------------------|---------|------|------------------------------| +| hosts | array | 是 | - | +| index | string | 是 | - | +| schema_save_mode | string | 是 | CREATE_SCHEMA_WHEN_NOT_EXIST | +| data_save_mode | string | 是 | APPEND_DATA | +| index_type | string | 否 | | +| primary_keys | list | 否 | | +| key_delimiter | string | 否 | `_` | +| username | string | 否 | | +| password | string | 否 | | +| max_retry_count | int | 否 | 3 | +| max_batch_size | int | 否 | 10 | +| tls_verify_certificate | boolean | 否 | true | +| tls_verify_hostnames | boolean | 否 | true | +| tls_keystore_path | string | 否 | - | +| tls_keystore_password | string | 否 | - | +| tls_truststore_path | string | 否 | - | +| tls_truststore_password | string | 否 | - | +| common-options | | 否 | - | + +### hosts [array] + +`Elasticsearch` 集群http地址,格式为 `host:port` ,允许指定多个主机。例如 `["host1:9200", "host2:9200"]` + +### index [string] + +`Elasticsearch` 的 `index` 名称。索引支持包含字段名变量,例如 `seatunnel_${age}`,并且该字段必须出现在 seatunnel Row 中。如果没有,我们将把它视为普通索引 + +### index_type [string] + +`Elasticsearch` 索引类型,elasticsearch 6及以上版本建议不要指定 + +### primary_keys [list] + +主键字段用于生成文档 `_id` ,这是 CDC 必需的选项。 + +### key_delimiter [string] + +设定复合键的分隔符(默认为 `_`),例如,如果使用 `$` 作为分隔符,那么文档的 `_id` 将呈现为 `KEY1$KEY2$KEY3` 的格式 + +### username [string] + +x-pack 用户名 + +### password [string] + +x-pack 密码 + +### max_retry_count [int] + +批次批量请求最大尝试大小 + +### max_batch_size [int] + +批次批量文档最大大小 + +### tls_verify_certificate [boolean] + +为 HTTPS 端点启用证书验证 + +### tls_verify_hostname [boolean] + +为 HTTPS 端点启用主机名验证 + +### tls_keystore_path [string] + +指向 PEM 或 JKS 密钥存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件 + +### tls_keystore_password [string] + +指定的密钥存储的密钥密码 + +### tls_truststore_path [string] + +指向 PEM 或 JKS 信任存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件 + +### tls_truststore_password [string] + +指定的信任存储的密钥密码 + +### common options + +Sink插件常用参数,请参考 [Sink常用选项](common-options.md) 了解详情 + +### schema_save_mode + +在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案<br/> +选项介绍:<br/> +`RECREATE_SCHEMA` :当表不存在时会创建,当表已存在时会删除并重建<br/> +`CREATE_SCHEMA_WHEN_NOT_EXIST` :当表不存在时会创建,当表已存在时则跳过<br/> Review Comment: ```suggestion `CREATE_SCHEMA_WHEN_NOT_EXIST` :当表不存在时会创建,当表已存在时则跳过创建<br/> ``` ########## docs/zh/connector-v2/sink/Kafka.md: ########## @@ -0,0 +1,196 @@ +# Kafka + +> Kafka 数据接收器 + +## 支持引擎 + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## 主要特性 + +- [x] [精确一次](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +> 默认情况下,我们将使用 2pc 来保证消息只发送一次到kafka + +## 描述 + +将 Rows 内容发送到 Kafka topic + +## 支持的数据源信息 + +为了使用 Kafka 连接器,需要以下依赖项 +可以通过 install-plugin.sh 或从 Maven 中央存储库下载 + +| 数据源 | 支持版本 | Maven | +|-------|------|-------------------------------------------------------------------------------------------------------| +| Kafka | 通用 | [下载](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) | + +## 接收器选项 + +| 名称 | 类型 | 是否需要 | 默认值 | 描述 | +|----------------------|--------|------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | 是 | - | 当表用作接收器时,topic 名称是要写入数据的 topic | +| bootstrap.servers | String | 是 | - | Kafka brokers 使用逗号分隔 | +| kafka.config | Map | 否 | - | 除了上述 Kafka Producer 客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 [Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs) | +| semantics | String | 否 | NON | 可以选择的语义是 EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。 | +| partition_key_fields | Array | 否 | - | 配置字段用作 kafka 消息的key | +| partition | Int | 否 | - | 可以指定分区,所有消息都会发送到此分区 | +| assign_partitions | Array | 否 | - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息 | +| transaction_prefix | String | 否 | - | 如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀 | +| format | String | 否 | json | 数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 和 avro。如果使用 json 或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅 [debezium-json](../formats/debezium-json.md) 了解详细信息 | +| field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符 | +| common-options | | 否 | - | Sink插件常用参数,请参考 [Sink常用选项 ](common-options.md) 了解详情 | + +## 参数解释 + +### Topic 格式 + +目前支持两种格式: + +1. 填写topic名称 + +2. 使用上游数据中的字段值作为 topic ,格式是 `${your field name}`, 其中 topic 是上游数据的其中一列的值 + + 例如,上游数据如下: + +| name | age | data | +|------|-----|---------------| +| Jack | 16 | data-example1 | +| Mary | 23 | data-example2 | + +如果 `${name}` 设置为 topic。因此,第一行发送到 Jack topic,第二行发送到 Mary topic。 + +### 语义 + +在 EXACTLY_ONCE 中,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给 Kafka +在 AT_LEAST_ONCE 中,生产者将等待 Kafka 缓冲区中所有未完成的消息在检查点上被 Kafka 生产者确认 +NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢失,并且消息可能会重复 Review Comment: ```suggestion NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢失,并且消息可能会重复,该模式下,任务失败重试可能会产生数据丢失或重复。 ``` ########## docs/zh/connector-v2/sink/Kafka.md: ########## @@ -0,0 +1,196 @@ +# Kafka + +> Kafka 数据接收器 + +## 支持引擎 + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## 主要特性 + +- [x] [精确一次](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +> 默认情况下,我们将使用 2pc 来保证消息只发送一次到kafka + +## 描述 + +将 Rows 内容发送到 Kafka topic + +## 支持的数据源信息 + +为了使用 Kafka 连接器,需要以下依赖项 +可以通过 install-plugin.sh 或从 Maven 中央存储库下载 + +| 数据源 | 支持版本 | Maven | +|-------|------|-------------------------------------------------------------------------------------------------------| +| Kafka | 通用 | [下载](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) | + +## 接收器选项 + +| 名称 | 类型 | 是否需要 | 默认值 | 描述 | +|----------------------|--------|------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | 是 | - | 当表用作接收器时,topic 名称是要写入数据的 topic | +| bootstrap.servers | String | 是 | - | Kafka brokers 使用逗号分隔 | +| kafka.config | Map | 否 | - | 除了上述 Kafka Producer 客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 [Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs) | +| semantics | String | 否 | NON | 可以选择的语义是 EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。 | +| partition_key_fields | Array | 否 | - | 配置字段用作 kafka 消息的key | +| partition | Int | 否 | - | 可以指定分区,所有消息都会发送到此分区 | +| assign_partitions | Array | 否 | - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息 | +| transaction_prefix | String | 否 | - | 如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀 | +| format | String | 否 | json | 数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 和 avro。如果使用 json 或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅 [debezium-json](../formats/debezium-json.md) 了解详细信息 | +| field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符 | +| common-options | | 否 | - | Sink插件常用参数,请参考 [Sink常用选项 ](common-options.md) 了解详情 | + +## 参数解释 + +### Topic 格式 + +目前支持两种格式: + +1. 填写topic名称 + +2. 使用上游数据中的字段值作为 topic ,格式是 `${your field name}`, 其中 topic 是上游数据的其中一列的值 + + 例如,上游数据如下: + +| name | age | data | +|------|-----|---------------| +| Jack | 16 | data-example1 | +| Mary | 23 | data-example2 | + +如果 `${name}` 设置为 topic。因此,第一行发送到 Jack topic,第二行发送到 Mary topic。 + +### 语义 + +在 EXACTLY_ONCE 中,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给 Kafka +在 AT_LEAST_ONCE 中,生产者将等待 Kafka 缓冲区中所有未完成的消息在检查点上被 Kafka 生产者确认 Review Comment: ```suggestion 在 AT_LEAST_ONCE 中,生产者将等待 Kafka 缓冲区中所有未完成的消息在检查点上被 Kafka 生产者确认,该模式下能保证数据至少写入kafka一次,即使任务失败重试也不会出现数据丢失,但可能有会发生数据重复。 ``` ########## docs/zh/connector-v2/sink/Kafka.md: ########## @@ -0,0 +1,196 @@ +# Kafka + +> Kafka 数据接收器 + +## 支持引擎 + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## 主要特性 + +- [x] [精确一次](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +> 默认情况下,我们将使用 2pc 来保证消息只发送一次到kafka + +## 描述 + +将 Rows 内容发送到 Kafka topic + +## 支持的数据源信息 + +为了使用 Kafka 连接器,需要以下依赖项 +可以通过 install-plugin.sh 或从 Maven 中央存储库下载 + +| 数据源 | 支持版本 | Maven | +|-------|------|-------------------------------------------------------------------------------------------------------| +| Kafka | 通用 | [下载](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) | + +## 接收器选项 + +| 名称 | 类型 | 是否需要 | 默认值 | 描述 | +|----------------------|--------|------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | 是 | - | 当表用作接收器时,topic 名称是要写入数据的 topic | +| bootstrap.servers | String | 是 | - | Kafka brokers 使用逗号分隔 | +| kafka.config | Map | 否 | - | 除了上述 Kafka Producer 客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 [Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs) | +| semantics | String | 否 | NON | 可以选择的语义是 EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。 | +| partition_key_fields | Array | 否 | - | 配置字段用作 kafka 消息的key | +| partition | Int | 否 | - | 可以指定分区,所有消息都会发送到此分区 | +| assign_partitions | Array | 否 | - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息 | +| transaction_prefix | String | 否 | - | 如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀 | +| format | String | 否 | json | 数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 和 avro。如果使用 json 或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅 [debezium-json](../formats/debezium-json.md) 了解详细信息 | +| field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符 | +| common-options | | 否 | - | Sink插件常用参数,请参考 [Sink常用选项 ](common-options.md) 了解详情 | + +## 参数解释 + +### Topic 格式 + +目前支持两种格式: + +1. 填写topic名称 + +2. 使用上游数据中的字段值作为 topic ,格式是 `${your field name}`, 其中 topic 是上游数据的其中一列的值 + + 例如,上游数据如下: + +| name | age | data | +|------|-----|---------------| +| Jack | 16 | data-example1 | +| Mary | 23 | data-example2 | + +如果 `${name}` 设置为 topic。因此,第一行发送到 Jack topic,第二行发送到 Mary topic。 + +### 语义 + +在 EXACTLY_ONCE 中,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给 Kafka Review Comment: ```suggestion 在 EXACTLY_ONCE 中,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给 Kafka,该模式下能保证数据精确写入kafka一次,即使任务失败重试也不会出现数据重复和丢失 ``` ########## docs/zh/connector-v2/formats/ogg-json.md: ########## @@ -0,0 +1,93 @@ +# Ogg 格式 + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a ogg) 是一项托管服务,提供实时数据网格平台,该平台使用复制来保持数据高度可用,并支持实时分析。客户可以设计、执行和监控其数据复制和流数据处理解决方案,而无需分配或管理计算环境。 Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 + +SeaTunnel 支持将 Ogg JSON 消息解释为 Seatunnel 系统中的 INSERT/UPDATE/DELETE 消息。在许多情况下,这个特性带来了很多便利,例如 + + 将增量数据从数据库同步到其他系统 + 审计日志 + 数据库的实时物化视图 + 关联维度数据库的变更历史,等等。 + +SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息转化为 Ogg JSON 消息,并将其发送到类似 Kafka 这样的存储中。然而,目前 SeaTunnel 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 消息。因此,Seatunnel 将 UPDATE_BEFORE 和 UPDATE_AFTER 转化为 DELETE 和 INSERT Ogg 消息来实现 + +# 格式选项 + +| 选项 | 默认值 | 是否需要 | 描述 | +|------------------------------|--------|------|------------------------------------------------------------------------------------| +| format | (none) | 是 | 指定要使用的格式,这里应该是`-json` | +| ogg_json.ignore-parse-errors | false | 否 | 跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为 null | +| ogg_json.database.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal 记录中的`database`元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容 | +| ogg_json.table.include | (none) | 否 | 正则表达式,可选,通过正则匹配 Canal 记录中的 `table` 元字段来仅读取特定表的更改日志行。此字符串Pattern模式与Java的Pattern兼容 | + +# 如何使用 Ogg 格式 + +## Kafka 使用示例 + +Ogg 为变更日志提供了统一的格式,下面是从 Oracle PRODUCTS 表捕获变更操作的简单示例: + +```bash +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +注:各字段含义请参考 [Debezium 文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events)注:各字段含义请参考 [Debezium 文档](https://debezium.io/documentation/reference/2.5/connectors/oracle.html#oracle-events) + +此 Oracle PRODUCTS 表有 4 列 (id, name, description 和 weight) +上面的 JSON 消息是 products 表上的更新更改事件,其中 id = 111 的行的权重值从 5.18 更改为 5.15。 +假设此表的 binlog 的消息已经同步到 Kafka topic,那么我们可以使用下面的 SeaTunnel 示例来消费这个主题并体现变更事件。 Review Comment: ```suggestion 假设此表的 binlog 的消息已经同步到 Kafka topic,那么我们可以使用下面的 SeaTunnel 示例来消费这个topic并体现变更事件。 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
