This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0727a58c69 [Doc][Improve] translate postgresql related chinese
document (#8552)
0727a58c69 is described below
commit 0727a58c697e3e19dd3d91b383d46b5655bb82df
Author: Liujinhui <[email protected]>
AuthorDate: Mon Jan 20 13:11:32 2025 +0800
[Doc][Improve] translate postgresql related chinese document (#8552)
---
docs/zh/connector-v2/sink/PostgreSql.md | 270 ++++++++++++++++++++++
docs/zh/connector-v2/source/PostgreSQL-CDC.md | 193 ++++++++++++++++
docs/zh/connector-v2/source/PostgreSQL.md | 307 ++++++++++++++++++++++++++
3 files changed, 770 insertions(+)
diff --git a/docs/zh/connector-v2/sink/PostgreSql.md
b/docs/zh/connector-v2/sink/PostgreSql.md
new file mode 100644
index 0000000000..7a05342170
--- /dev/null
+++ b/docs/zh/connector-v2/sink/PostgreSql.md
@@ -0,0 +1,270 @@
+# PostgreSql
+
+> JDBC PostgreSql 数据接收器
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 描述
+
+通过 JDBC 写入数据。支持批处理模式和流式模式,支持并发写入,支持精确一次语义(使用 XA 事务保证)。
+
+## 使用依赖
+
+### 对于 Spark/Flink 引擎
+
+> 1. 您需要确保 [jdbc 驱动 jar
包](https://mvnrepository.com/artifact/org.postgresql/postgresql) 已放置在目录
`${SEATUNNEL_HOME}/plugins/` 中。
+
+### 对于 SeaTunnel Zeta 引擎
+
+> 1. 您需要确保 [jdbc 驱动 jar
包](https://mvnrepository.com/artifact/org.postgresql/postgresql) 已放置在目录
`${SEATUNNEL_HOME}/lib/` 中。
+
+## 主要特性
+
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [x] [变更数据捕获(CDC)](../../concept/connector-v2-features.md)
+
+> 使用 `XA 事务` 来确保 `精确一次`。因此,仅对支持 `XA 事务` 的数据库支持 `精确一次`。您可以设置
`is_exactly_once=true` 来启用此功能。
+
+## 支持的数据源信息
+| 数据源 | 支持的版本 | 驱动
| URL |
Maven |
+|--------------|-----------------------------------------------------|----------------------|---------------------------------------|--------------------------------------------------------------------------|
+| PostgreSQL | 不同的依赖版本有不同的驱动类。 | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+| PostgreSQL | 如果您想在 PostgreSQL 中处理 GEOMETRY 类型。 |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+
+## 数据库依赖
+
+> 请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATUNNEL_HOME/plugins/jdbc/lib/' 工作目录中。<br/>
+> 例如 PostgreSQL 数据源:`cp postgresql-xxx.jar
$SEATUNNEL_HOME/plugins/jdbc/lib/`<br/>
+> 如果您想在 PostgreSQL 中处理 GEOMETRY 类型,请将 `postgresql-xxx.jar` 和
`postgis-jdbc-xxx.jar` 添加到 `$SEATUNNEL_HOME/plugins/jdbc/lib/` 中。
+
+## 数据类型映射
+| PostgreSQL 数据类型
|
SeaTunnel 数据类型 |
+|--------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL<br/>
| BOOLEAN
|
+| _BOOL<br/>
| ARRAY<BOOLEAN>
|
+| BYTEA<br/>
| BYTES
|
+| _BYTEA<br/>
| ARRAY<TINYINT>
|
+| INT2<br/>SMALLSERIAL<br/>INT4<br/>SERIAL<br/>
| INT
|
+| _INT2<br/>_INT4<br/>
| ARRAY<INT>
|
+| INT8<br/>BIGSERIAL<br/>
| BIGINT
|
+| _INT8<br/>
| ARRAY<BIGINT>
|
+| FLOAT4<br/>
| FLOAT
|
+| _FLOAT4<br/>
| ARRAY<FLOAT>
|
+| FLOAT8<br/>
| DOUBLE
|
+| _FLOAT8<br/>
| ARRAY<DOUBLE>
|
+| NUMERIC(指定列的列大小>0)
| DECIMAL(指定列的列大小,获取指定列小数点右侧的数字位数)
|
+| NUMERIC(指定列的列大小<0)
| DECIMAL(38, 18)
|
+|
BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY<br/>JSON<br/>JSONB<br/>UUID
| STRING
|
+| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT
| ARRAY<STRING>
|
+| TIMESTAMP<br/>
| TIMESTAMP
|
+| TIME<br/>
| TIME
|
+| DATE<br/>
| DATE
|
+| 其他数据类型
| 目前不支持
|
+
+## 选项
+
+| 名称 | 类型 | 必填 | 默认
|
描述
[...]
+|-------------------------------------------|---------|------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| url | String | 是 | -
| JDBC 连接的 URL。参见示例:jdbc:postgresql://localhost:5432/test <br/>
如果您使用 json 或 jsonb 类型插入,请添加 jdbc url 字符串 `stringtype=unspecified` 选项。
[...]
+| driver | String | 是 | -
| 用于连接远程数据源的 JDBC 类名,<br/> 如果使用 PostgreSQL,则该值为
`org.postgresql.Driver`。
[...]
+| user | String | 否 | -
| 连接实例的用户名。
[...]
+| password | String | 否 | -
| 连接实例的密码。
[...]
+| query | String | 否 | -
| 使用此 SQL 将上游输入数据写入数据库。例如 `INSERT ...`,`query` 的优先级更高。
[...]
+| database | String | 否 | -
| 使用此 `database` 和 `table-name` 自动生成 SQL,并接收上游输入数据写入数据库。<br/>此选项与
`query` 互斥,并具有更高的优先级。
[...]
+| table | String | 否 | -
| 使用数据库和此表名自动生成 SQL,并接收上游输入数据写入数据库。<br/>此选项与 `query`
互斥,并具有更高的优先级。表参数可以填写一个不想的表的名称,最终将作为创建表的表名,并支持变量(`${table_name}`,`${schema_name}`)。替换规则:
`${schema_name}` 将替换为传递给目标端的 SCHEMA 名称,`${table_name}` 将替换为传递给目标端的表名称。 |
+| primary_keys | Array | 否 | -
| 此选项用于支持在自动生成 SQL 时进行 `insert`,`delete` 和 `update` 操作。
[...]
+| support_upsert_by_query_primary_key_exist | Boolean | 否 | false
| 选择使用 INSERT SQL,UPDATE SQL
根据查询主键存在来处理更新事件(INSERT,UPDATE_AFTER)。此配置仅在数据库不支持 upsert 语法时使用。**注意**:此方法性能较低。
[...]
+| connection_check_timeout_sec | Int | 否 | 30
| 用于验证连接的数据库操作完成的等待时间(秒)。
[...]
+| max_retries | Int | 否 | 0
| 提交失败的重试次数(executeBatch)。
[...]
+| batch_size | Int | 否 | 1000
| 对于批量写入,当缓冲记录的数量达到 `batch_size` 或时间达到
`checkpoint.interval`<br/>时,数据将刷新到数据库。
[...]
+| is_exactly_once | Boolean | 否 | false
| 是否启用精确一次语义,将使用 XA 事务。如果启用,您需要<br/>设置 `xa_data_source_class_name`。
[...]
+| generate_sink_sql | Boolean | 否 | false
| 根据要写入的数据库表生成 SQL 语句。
[...]
+| xa_data_source_class_name | String | 否 | -
| 数据库驱动的 XA 数据源类名,例如,PostgreSQL 是
`org.postgresql.xa.PGXADataSource`,并<br/>请参阅附录以获取其他数据源。
[...]
+| max_commit_attempts | Int | 否 | 3
| 事务提交失败的重试次数。
[...]
+| transaction_timeout_sec | Int | 否 | -1
| 事务开启后的超时时间,默认值为 -1(永不超时)。注意设置超时可能会影响<br/>精确一次语义。
[...]
+| auto_commit | Boolean | 否 | true
| 默认启用自动事务提交。
[...]
+| field_ide | String | 否 | -
| 识别字段在从源到汇的同步时是否需要转换。`ORIGINAL` 表示无需转换;`UPPERCASE`
表示转换为大写;`LOWERCASE` 表示转换为小写。
[...]
+| properties | Map | 否 | -
| 附加连接配置参数,当 properties 和 URL 具有相同参数时,优先级由<br/>驱动的具体实现决定。例如,在 MySQL
中,properties 优先于 URL。
[...]
+| common-options | | 否 | -
| Sink 插件的公共参数,请参阅 [Sink 公共选项](../sink-common-options.md) 以获取详细信息。
[...]
+| schema_save_mode | Enum | 否 |
CREATE_SCHEMA_WHEN_NOT_EXIST | 在同步任务开启之前,根据目标端现有表结构选择不同处理方案。
[...]
+| data_save_mode | Enum | 否 | APPEND_DATA
| 在同步任务开启之前,根据目标端现有数据选择不同处理方案。
[...]
+| custom_sql | String | 否 | -
| 当 `data_save_mode` 选择 `CUSTOM_PROCESSING` 时,您应该填写 `CUSTOM_SQL`
参数。此参数通常填入可执行的 SQL。SQL 将在同步任务之前执行。
[...]
+| enable_upsert | Boolean | 否 | true
| 通过主键存在启用 upsert,如果任务没有重复数据,设置此参数为 `false` 可以加快数据导入。
[...]
+
+### table [字符串]
+
+使用 `database` 和此 `table-name` 自动生成 SQL,并接收上游输入数据写入数据库。
+
+此选项与 `query` 互斥,并具有更高的优先级。
+
+表参数可以填写一个不想的表的名称,最终将作为创建表的表名,并支持变量(`${table_name}`,`${schema_name}`)。替换规则:`${schema_name}`
将替换为传递给目标端的 SCHEMA 名称,`${table_name}` 将替换为传递给目标端的表名称。
+
+例如:
+1. `${schema_name}.${table_name} _test`
+2. `dbo.tt_${table_name} _sink`
+3. `public.sink_table`
+
+### schema_save_mode [枚举]
+
+在同步任务开启之前,根据目标端现有表结构选择不同处理方案。
+选项介绍:
+`RECREATE_SCHEMA` :当表不存在时将创建,保存时删除并重建。
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :当表不存在时创建,保存时跳过。
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :当表不存在时报告错误。
+`IGNORE` :忽略对表的处理。
+
+### data_save_mode [枚举]
+
+在同步任务开启之前,根据目标端现有数据选择不同处理方案。
+选项介绍:
+`DROP_DATA`:保留数据库结构并删除数据。
+`APPEND_DATA`:保留数据库结构,保留数据。
+`CUSTOM_PROCESSING`:用户定义处理。
+`ERROR_WHEN_DATA_EXISTS`:当存在数据时报告错误。
+### custom_sql [字符串]
+
+当 `data_save_mode` 选择 `CUSTOM_PROCESSING` 时,您应该填写 `CUSTOM_SQL` 参数。此参数通常填入可以执行的
SQL。SQL 将在同步任务之前执行。
+
+### 提示
+
+> 如果未设置 `partition_column`,它将以单线程并发运行;如果设置了 `partition_column`,它将根据任务的并发性并行执行。
+
+## 任务示例
+
+### 简单示例:
+
+> 此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并将其发送到 JDBC Sink。FakeSource 生成总共
16 行数据(`row.num=16`),每行有两个字段,`name`(字符串类型)和 `age`(整数类型)。最终目标表 `test_table` 也将包含
16 行数据。在运行此作业之前,您需要在 PostgreSQL 中创建数据库 `test` 和表 `test_table`。如果您还未安装和部署
SeaTunnel,请按照 [安装 SeaTunnel](../../start-v2/locally/deployment.md)
中的说明进行安装和部署。然后按照 [快速开始 SeaTunnel
引擎](../../start-v2/locally/quick-start-seatunnel-engine.md) 中的说明运行此作业。
+
+```
+# Defining the runtime environment
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ parallelism = 1
+ plugin_output = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform-v2
+}
+
+sink {
+ jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = root
+ password = 123456
+ query = "insert into test_table(name,age) values(?,?)"
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
+```
+
+### 生成 Sink SQL
+
+
+> 此示例不需要编写复杂的 SQL 语句,您可以配置数据库名称和表名称,系统将自动为您生成添加语句。
+
+```
+sink {
+ Jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = org.postgresql.Driver
+ user = root
+ password = 123456
+
+ generate_sink_sql = true
+ database = test
+ table = "public.test_table"
+ }
+}
+```
+
+### 精确一次:
+
+> 对于精确写入场景,我们保证精确一次。
+
+```
+sink {
+ jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+
+ max_retries = 0
+ user = root
+ password = 123456
+ query = "insert into test_table(name,age) values(?,?)"
+
+ is_exactly_once = "true"
+
+ xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+ }
+}
+```
+
+### CDC(变更数据捕获)事件
+
+> 我们也支持 CDC 变更数据。在这种情况下,您需要配置数据库、表和主键。
+
+```
+sink {
+ jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = root
+ password = 123456
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = test
+ table = sink_table
+ primary_keys = ["id","name"]
+ field_ide = UPPERCASE
+ }
+}
+```
+
+### 保存模式功能
+
+```
+sink {
+ Jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = org.postgresql.Driver
+ user = root
+ password = 123456
+
+ generate_sink_sql = true
+ database = test
+ table = "public.test_table"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/source/PostgreSQL-CDC.md
b/docs/zh/connector-v2/source/PostgreSQL-CDC.md
new file mode 100644
index 0000000000..bf6c611733
--- /dev/null
+++ b/docs/zh/connector-v2/source/PostgreSQL-CDC.md
@@ -0,0 +1,193 @@
+# PostgreSQL CDC
+
+> PostgreSQL CDC 源连接器
+
+## 支持的引擎
+
+> SeaTunnel Zeta<br/>
+> Flink <br/>
+
+## 主要特性
+
+- [ ] [批处理](../../concept/connector-v2-features.md)
+- [x] [流处理](../../concept/connector-v2-features.md)
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [列投影](../../concept/connector-v2-features.md)
+- [x] [并行性](../../concept/connector-v2-features.md)
+- [x] [支持用户定义的拆分](../../concept/connector-v2-features.md)
+
+## 描述
+
+Postgre CDC 连接器允许从 Postgre 数据库读取快照数据和增量数据。本文件描述了如何设置 Postgre CDC 连接器,以便对
Postgre 数据库执行 SQL 查询。
+
+## 支持的数据源信息
+
+| 数据源 | 支持的版本 | 驱动
| Url |
Maven |
+|------------|-----------------------------------------------------|---------------------|---------------------------------------|--------------------------------------------------------------------------|
+| PostgreSQL | 不同的依赖版本有不同的驱动类。 | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY 类型。 |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+
+## 使用依赖
+
+### 安装 Jdbc 驱动
+
+#### 对于 Spark/Flink 引擎
+
+> 1. 您需要确保 [jdbc 驱动 jar
包](https://mvnrepository.com/artifact/org.postgresql/postgresql) 已放置在目录
`${SEATUNNEL_HOME}/plugins/` 中。
+
+#### 对于 SeaTunnel Zeta 引擎
+
+> 1. 您需要确保 [jdbc 驱动 jar
包](https://mvnrepository.com/artifact/org.postgresql/postgresql) 已放置在目录
`${SEATUNNEL_HOME}/lib/` 中。
+
+请下载并将 PostgreSQL 驱动放入 `${SEATUNNEL_HOME}/lib/` 目录。例如:cp postgresql-xxx.jar
`$SEATUNNEL_HOME/lib/`
+
+> 以下是启用 PostgreSQL 中的 CDC(变化数据捕获)的步骤:
+
+1. 确保 wal_level 设置为 logical:通过在 postgresql.conf 配置文件中添加 "wal_level = logical"
来修改,重启 PostgreSQL 服务器以使更改生效。
+ 或者,您可以使用 SQL 命令直接修改配置:
+
+```sql
+ALTER SYSTEM SET wal_level TO 'logical';
+SELECT pg_reload_conf();
+```
+
+2. 将指定表的 REPLICA 策略更改为 FULL
+
+```sql
+ALTER TABLE your_table_name REPLICA IDENTITY FULL;
+```
+
+## 数据类型映射
+
+| PostgreSQL 数据类型
| SeaTunnel
数据类型 |
+|-----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL<br/>
| BOOLEAN
|
+| _BOOL<br/>
| ARRAY<BOOLEAN>
|
+| BYTEA<br/>
| BYTES
|
+| _BYTEA<br/>
| ARRAY<TINYINT>
|
+| INT2<br/>SMALLSERIAL<br/>INT4<br/>SERIAL<br/>
| INT
|
+| _INT2<br/>_INT4<br/>
| ARRAY<INT>
|
+| INT8<br/>BIGSERIAL<br/>
| BIGINT
|
+| _INT8<br/>
| ARRAY<BIGINT>
|
+| FLOAT4<br/>
| FLOAT
|
+| _FLOAT4<br/>
| ARRAY<FLOAT>
|
+| FLOAT8<br/>
| DOUBLE
|
+| _FLOAT8<br/>
| ARRAY<DOUBLE>
|
+| NUMERIC(指定列的列大小>0)
| DECIMAL(指定列的列大小, 获取指定列小数点右侧的位数)
|
+| NUMERIC(指定列的列大小<0)
| DECIMAL(38, 18)
|
+|
BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY<br/>JSON<br/>JSONB
| STRING
|
+| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT
| ARRAY<STRING>
|
+| TIMESTAMP<br/>
| TIMESTAMP
|
+| TIME<br/>
| TIME
|
+| DATE<br/>
| DATE
|
+| 其他数据类型
| 尚不支持
|
+
+## 源选项
+
+| 名称 | 类型 | 必需 | 默认 | 描述
[...]
+|------------------------------------------------|----------|------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| base-url | String | 是 | - |
JDBC 连接的
URL。参考案例:`jdbc:postgresql://localhost:5432/postgres_cdc?loggerLevel=OFF`。
[...]
+| username | String | 是 | - |
连接到数据库服务器时使用的数据库名称。
[...]
+| password | String | 是 | - |
连接到数据库服务器时使用的密码。
[...]
+| database-names | List | 否 | - |
需要监控的数据库名称。
[...]
+| table-names | List | 是 | - |
需要监控的数据库表名称。表名称需要包含数据库名称,例如:`database_name.table_name`。
[...]
+| table-names-config | List | 否 | - |
表配置列表。例如: [{"table": "db1.schema1.table1","primaryKeys":
["key1"],"snapshotSplitColumn": "key2"}]
[...]
+| startup.mode | List | 否 | INITIAL |
PostgreSQL CDC 消费者的可选启动模式,有效枚举为 `initial`、`earliest` 和 `latest`。<br/>
`initial`: 启动时同步历史数据,然后同步增量数据。<br/> `earliest`: 从可能的最早偏移量启动。<br/> `latest`:
从最新偏移量启动。
[...]
+| snapshot.split.size | Integer | 否 | 8096 |
表快照的拆分大小(行数),捕获的表在读取表快照时被拆分成多个拆分。
[...]
+| snapshot.fetch.size | Integer | 否 | 1024 |
读取表快照时每次轮询的最大获取大小。
[...]
+| slot.name | String | 否 | - |
为特定数据库/模式创建的用于流式传输更改的 PostgreSQL 逻辑解码槽的名称。服务器使用此槽将事件流式传输到您正在配置的连接器。默认值为
seatunnel。
[...]
+| decoding.plugin.name | String | 否 | pgoutput |
安装在服务器上的 Postgres 逻辑解码插件的名称,支持的值有
decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2json_rds_streaming 和
pgoutput。
[...]
+| server-time-zone | String | 否 | UTC |
数据库服务器中的会话时区。如果未设置,则使用 ZoneId.systemDefault() 来确定服务器时区。
[...]
+| connect.timeout.ms | Duration | 否 | 30000 |
连接器在尝试连接到数据库服务器后应等待的最大时间,以防超时。
[...]
+| connect.max-retries | Integer | 否 | 3 |
连接器应重试建立数据库服务器连接的最大重试次数。
[...]
+| connection.pool.size | Integer | 否 | 20 |
JDBC 连接池大小。
[...]
+| chunk-key.even-distribution.factor.upper-bound | Double | 否 | 100 |
块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即 (MAX(id) - MIN(id) + 1) /
行数),则将优化表块以实现均匀分布。否则,如果分布因子更大,则将认为该表分布不均匀,并且如果估计的分片数量超过
`sample-sharding.threshold` 指定的值,则将使用基于采样的分片策略。默认值为 100.0。 |
+| chunk-key.even-distribution.factor.lower-bound | Double | 否 | 0.05 |
块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即 (MAX(id) - MIN(id) + 1) /
行数),则将优化表块以实现均匀分布。否则,如果分布因子更小,则将认为该表分布不均匀,并且如果估计的分片数量超过
`sample-sharding.threshold` 指定的值,则将使用基于采样的分片策略。默认值为 0.05。 |
+| sample-sharding.threshold | Integer | 否 | 1000 |
此配置指定触发采样分片策略的估计分片数量阈值。当分布因子超出由
`chunk-key.even-distribution.factor.upper-bound` 和
`chunk-key.even-distribution.factor.lower-bound` 指定的范围,且估计的分片数量(计算为近似行数 /
块大小)超过此阈值时,将使用采样分片策略。这可以帮助更有效地处理大数据集。默认值为 1000 个分片。
|
+| inverse-sampling.rate | Integer | 否 | 1000 |
在采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则意味着在采样过程中应用 1/1000
的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理非常大数据集时,较低的采样率尤为有用。默认值为 1000。
|
+| exactly_once | Boolean | 否 | false |
启用精确一次语义。
[...]
+| format | Enum | 否 | DEFAULT |
PostgreSQL CDC 的可选输出格式,有效枚举为 `DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`。
[...]
+| debezium | Config | 否 | - |
将 [Debezium
的属性](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/postgresql.adoc#connector-configuration-properties)
传递给用于捕获 PostgreSQL 服务器数据更改的 Debezium 嵌入式引擎。
[...]
+| common-options | | 否 | - |
源插件的公共参数,请参阅 [源公共选项](../source-common-options.md) 获取详细信息。
[...]
+
+## 任务示例
+
+### 简单
+
+> 支持多表读取
+
+```
+
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Postgres-CDC {
+ plugin_output = "customers_Postgre_cdc"
+ username = "postgres"
+ password = "postgres"
+ database-names = ["postgres_cdc"]
+ schema-names = ["inventory"]
+ table-names =
["postgres_cdc.inventory.postgres_cdc_table_1,postgres_cdc.inventory.postgres_cdc_table_2"]
+ base-url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ }
+}
+
+transform {
+
+}
+
+sink {
+ jdbc {
+ plugin_input = "customers_Postgre_cdc"
+ url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "postgres"
+ password = "postgres"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = postgres_cdc
+ schema = "inventory"
+ tablePrefix = "sink_"
+ primary_keys = ["id"]
+ }
+}
+```
+
+### 支持自定义表的主键
+
+```
+source {
+ Postgres-CDC {
+ plugin_output = "customers_mysql_cdc"
+ username = "postgres"
+ password = "postgres"
+ database-names = ["postgres_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["postgres_cdc.inventory.full_types_no_primary_key"]
+ base-url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ decoding.plugin.name = "decoderbufs"
+ exactly_once = false
+ table-names-config = [
+ {
+ table = "postgres_cdc.inventory.full_types_no_primary_key"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+```
+
+## 更新日志
+
+- 添加 PostgreSQL CDC 源连接器
+
+### 下一个版本
diff --git a/docs/zh/connector-v2/source/PostgreSQL.md
b/docs/zh/connector-v2/source/PostgreSQL.md
new file mode 100644
index 0000000000..1cd2f3de6a
--- /dev/null
+++ b/docs/zh/connector-v2/source/PostgreSQL.md
@@ -0,0 +1,307 @@
+# PostgreSQL
+
+> JDBC PostgreSQL 源连接器
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 使用依赖
+
+### 对于 Spark/Flink 引擎
+
+> 1. 您需要确保 [jdbc 驱动的jar
包](https://mvnrepository.com/artifact/org.postgresql/postgresql) 已放置在目录
`${SEATUNNEL_HOME}/plugins/` 中。
+
+### 对于 SeaTunnel Zeta 引擎
+
+> 1. 您需要确保 [jdbc 驱动 jar
包](https://mvnrepository.com/artifact/org.postgresql/postgresql) 已放置在目录
`${SEATUNNEL_HOME}/lib/` 中。
+
+## 主要特性
+
+- [x] [批处理](../../concept/connector-v2-features.md)
+- [ ] [流处理](../../concept/connector-v2-features.md)
+- [x] [严格一次性](../../concept/connector-v2-features.md)
+- [x] [列投影](../../concept/connector-v2-features.md)
+- [x] [并行性](../../concept/connector-v2-features.md)
+- [x] [支持用户定义的拆分](../../concept/connector-v2-features.md)
+
+> 支持查询 SQL,并可以实现投影效果。
+
+## 描述
+
+通过 JDBC 读取外部数据源数据。
+
+## 支持的数据源信息
+
+| 数据源 | 支持的版本 | 驱动
| URL |
Maven |
+|----------------|----------------------------------------------------|---------------------|---------------------------------------|--------------------------------------------------------------------------|
+| PostgreSQL | 不同的依赖版本有不同的驱动类。 |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY 类型。 |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+
+## 数据库依赖
+
+> 请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATUNNEL_HOME/plugins/jdbc/lib/' 工作目录中<br/>
+> 例如,对于 PostgreSQL 数据源: cp postgresql-xxx.jar
$SEATUNNEL_HOME/plugins/jdbc/lib/<br/>
+> 如果您想在 PostgreSQL 中操作 GEOMETRY 类型,请将 postgresql-xxx.jar 和
postgis-jdbc-xxx.jar 添加到 $SEATUNNEL_HOME/plugins/jdbc/lib/
+
+## 数据类型映射
+
+| PostgreSQL 数据类型
|
SeaTunnel 数据类型 |
+|--------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL<br/>
| BOOLEAN
|
+| _BOOL<br/>
| ARRAY<BOOLEAN>
|
+| BYTEA<br/>
| BYTES
|
+| _BYTEA<br/>
| ARRAY<TINYINT>
|
+| INT2<br/>SMALLSERIAL
| SMALLINT
|
+| _INT2
| ARRAY<SMALLINT>
|
+| INT4<br/>SERIAL<br/>
| INT
|
+| _INT4<br/>
| ARRAY<INT>
|
+| INT8<br/>BIGSERIAL<br/>
| BIGINT
|
+| _INT8<br/>
| ARRAY<BIGINT>
|
+| FLOAT4<br/>
| FLOAT
|
+| _FLOAT4<br/>
| ARRAY<FLOAT>
|
+| FLOAT8<br/>
| DOUBLE
|
+| _FLOAT8<br/>
| ARRAY<DOUBLE>
|
+| NUMERIC(指定列的列大小>0)
| DECIMAL(指定列的列大小,获取指定列小数点右侧的数字位数)
|
+| NUMERIC(指定列的列大小<0)
| DECIMAL(38, 18)
|
+|
BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY<br/>JSON<br/>JSONB<br/>UUID
| STRING
|
+| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT
| ARRAY<STRING>
|
+| TIMESTAMP(s)<br/>TIMESTAMPTZ(s)
| TIMESTAMP(s)
|
+| TIME(s)<br/>TIMETZ(s)
| TIME(s)
|
+| DATE<br/>
| DATE
|
+
+## 选项
+
+| 名称 | 类型 | 必需 | 默认 |
描述
[...]
+|----------------------------------------------|------------|------|-----------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| url | String | 是 | -
| JDBC 连接的 URL。参考示例:jdbc:postgresql://localhost:5432/test
[...]
+| driver | String | 是 | -
| 用于连接到远程数据源的 JDBC 类名,<br/> 如果您使用 MySQL,则值为 `com.mysql.cj.jdbc.Driver`。
[...]
+| user | String | 否 | -
| 连接实例的用户名
[...]
+| password | String | 否 | -
| 连接实例的密码
[...]
+| query | String | 是 | -
| 查询语句
[...]
+| connection_check_timeout_sec | Int | 否 | 30
| 用于验证连接的数据库操作完成的等待时间(秒)
[...]
+| partition_column | String | 否 | -
| 用于并行化的分区列名,仅支持数字类型,<br/> 仅支持数字类型主键,并且只能配置一列。
[...]
+| partition_lower_bound | BigDecimal | 否 | -
| 扫描的 partition_column 的最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
[...]
+| partition_upper_bound | BigDecimal | 否 | -
| 扫描的 partition_column 的最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
[...]
+| partition_num | Int | 否 | 作业并行性
| 分区数量,仅支持正整数。默认值为作业并行性
[...]
+| fetch_size | Int | 否 | 0
| 对于返回大量对象的查询,您可以配置<br/> 用于查询的行抓取大小,以通过减少所需的数据库访问次数来提高性能。<br/> 0 表示使用
JDBC 默认值。
[...]
+| properties | Map | 否 | -
| 其他连接配置参数,当属性和 URL 具有相同参数时,<br/> 优先级由驱动程序的具体实现决定。在 MySQL 中,属性优先于 URL。
[...]
+| table_path | String | 否 | -
| 表的完整路径,您可以使用此配置替代 `query`。<br/> 示例:<br/> mysql: "testdb.table1" <br/>
oracle: "test_schema.table1" <br/> sqlserver: "testdb.test_schema.table1" <br/>
postgresql: "testdb.test_schema.table1"
[...]
+| table_list | Array | 否 | -
| 要读取的表列表,您可以使用此配置替代 `table_path` 示例:```[{ table_path =
"testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name
from testdb.table2"}]```
[...]
+| where_condition | String | 否 | -
| 所有表/查询的通用行过滤条件,必须以 `where` 开头。 例如 `where id > 100`
[...]
+| split.size | Int | 否 | 8096
| 表的拆分大小(行数),被捕获的表在读取时被拆分为多个拆分。
[...]
+| split.even-distribution.factor.lower-bound | Double | 否 | 0.05
| 块键分布因子的下限。此因子用于确定表数据是否均匀分布。<br/> 如果计算出的分布因子大于或等于此下限(即 (MAX(id) -
MIN(id) + 1) / 行数),则表块将优化为均匀分布。否则,如果分布因子较小,则将视为不均匀分布,当估计的分片数超过
`sample-sharding.threshold` 指定的值时,将使用基于采样的分片策略。默认值为 0.05。 |
+| split.even-distribution.factor.upper-bound | Double | 否 | 100
| 块键分布因子的上限。此因子用于确定表数据是否均匀分布。<br/> 如果计算出的分布因子小于或等于此上限(即 (MAX(id) -
MIN(id) + 1) / 行数),则表块将优化为均匀分布。否则,如果分布因子较大,则将视为不均匀分布,当估计的分片数超过
`sample-sharding.threshold` 指定的值时,将使用基于采样的分片策略。默认值为 100.0。 |
+| split.sample-sharding.threshold | Int | 否 | 10000
| 此配置指定触发样本分片策略的估计分片数阈值。<br/> 当分布因子超出
`chunk-key.even-distribution.factor.upper-bound` 和
`chunk-key.even-distribution.factor.lower-bound` 指定的范围时,且估计的分片数(计算为近似行数 /
块大小)超过此阈值,将使用样本分片策略。这可以帮助更高效地处理大数据集。默认值为 1000 个分片。
|
+| split.inverse-sampling.rate | Int | 否 | 1000
| 在样本分片策略中使用的采样率的逆数。例如,如果此值设置为 1000,表示在采样过程中应用 1/1000
的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理非常大的数据集时,较低的采样率尤其有用。默认值为 1000。
|
+|
+## 并行读取器
+
+JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用某些规则来拆分表中的数据,这些数据将交给读取器进行读取。读取器的数量由
`parallelism` 选项确定。
+
+**拆分键规则:**
+
+1. 如果 `partition_column` 不为 null,将用于计算拆分。该列必须属于 **支持的拆分数据类型**。
+2. 如果 `partition_column` 为 null,SeaTunnel
将从表中读取模式并获取主键和唯一索引。如果主键和唯一索引中有多列,则使用第一个属于 **支持的拆分数据类型** 的列来拆分数据。例如,表有主键(nn
guid, name varchar),因为 `guid` 不在 **支持的拆分数据类型** 中,因此将使用列 `name` 来拆分数据。
+
+**支持的拆分数据类型:**
+* 字符串
+* 数字(int, bigint, decimal, ...)
+* 日期
+
+### 与拆分相关的选项
+
+#### split.size
+
+每个拆分中有多少行,当读取表时,被捕获的表将拆分为多个拆分。
+
+#### split.even-distribution.factor.lower-bound
+
+> 不推荐使用
+
+块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即 (MAX(id) - MIN(id) + 1) /
行数),则表块将优化为均匀分布。否则,如果分布因子较小,则将视为不均匀分布,当估计的分片数超过 `sample-sharding.threshold`
指定的值时,将使用基于采样的分片策略。默认值为 0.05。
+
+#### split.even-distribution.factor.upper-bound
+
+> 不推荐使用
+
+块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即 (MAX(id) - MIN(id) + 1) /
行数),则表块将优化为均匀分布。否则,如果分布因子较大,则将视为不均匀分布,当估计的分片数超过 `sample-sharding.threshold`
指定的值时,将使用基于采样的分片策略。默认值为 100.0。
+
+#### split.sample-sharding.threshold
+
+此配置指定触发样本分片策略的估计分片数阈值。当分布因子超出 `chunk-key.even-distribution.factor.upper-bound`
和 `chunk-key.even-distribution.factor.lower-bound` 指定的范围时,且估计的分片数(计算为近似行数 /
块大小)超过此阈值,将使用样本分片策略。这可以帮助更高效地处理大数据集。默认值为 1000 个分片。
+
+#### split.inverse-sampling.rate
+
+在样本分片策略中使用的采样率的逆数。例如,如果此值设置为 1000,表示在采样过程中应用 1/1000
的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理非常大的数据集时,较低的采样率尤其有用。默认值为 1000。
+
+#### partition_column [字符串]
+
+用于拆分数据的列名。
+
+#### partition_upper_bound [BigDecimal]
+
+扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
+
+#### partition_lower_bound [BigDecimal]
+
+扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
+
+#### partition_num [整数]
+
+> 不推荐使用,正确的方法是通过 `split.size` 控制拆分数量
+
+我们需要拆分成多少个拆分,仅支持正整数。默认值为作业并行性。
+
+## 提示
+
+> 如果表无法拆分(例如,表没有主键或唯一索引,并且未设置 `partition_column`),将以单一并发运行。
+>
+> 使用 `table_path` 替代 `query` 进行单表读取。如果需要读取多个表,请使用 `table_list`。
+
+## 任务示例
+
+### 简单示例:
+
+> 此示例查询您测试 "database" 中 type_bin 为 'table' 的 16
条数据,并以单并行方式查询其所有字段。您还可以指定要查询的字段,以便最终输出到控制台。
+
+```
+# Defining the runtime environment
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+
+source{
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from source limit 16"
+ }
+}
+
+transform {
+ # please go to https://seatunnel.apache.org/docs/transform-v2/sql
+}
+
+sink {
+ Console {}
+}
+```
+
+### 按 partition_column 并行读取
+
+> 使用您配置的分片字段和分片数据并行读取查询表。如果您想要读取整个表,可以这样做。
+
+```
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+source{
+ jdbc{
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from source"
+ partition_column= "id"
+ partition_num = 5
+ }
+}
+sink {
+ Console {}
+}
+```
+
+### 按主键或唯一索引并行读取
+
+> 配置 `table_path` 将启用自动拆分,您可以配置 `split.*` 来调整拆分策略。
+
+```
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+source {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ connection_check_timeout_sec = 100
+ user = "root"
+ password = "123456"
+ table_path = "test.public.AllDataType_1"
+ query = "select * from public.AllDataType_1"
+ split.size = 10000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
+### 并行边界:
+
+> 在查询中指定上下边界内的数据更为高效。根据您配置的上下边界读取数据源将更为高效。
+
+```
+source{
+ jdbc{
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from source"
+ partition_column= "id"
+
+ # The name of the table returned
+ plugin_output = "jdbc"
+ partition_lower_bound = 1
+ partition_upper_bound = 50
+ partition_num = 5
+ }
+}
+```
+
+### 多表读取:
+
+***配置 `table_list` 将启用自动拆分,您可以配置 `split.*` 来调整拆分策略***
+
+```hocon
+env {
+ job.mode = "BATCH"
+ parallelism = 4
+}
+source {
+ Jdbc {
+ url="jdbc:postgresql://datasource01:5432/demo"
+ user="iDm82k6Q0Tq+wUprWnPsLQ=="
+ driver="org.postgresql.Driver"
+ password="iDm82k6Q0Tq+wUprWnPsLQ=="
+ "table_list"=[
+ {
+ "table_path"="demo.public.AllDataType_1"
+ },
+ {
+ "table_path"="demo.public.alldatatype"
+ }
+ ]
+ #where_condition= "where id > 100"
+ split.size = 10000
+ #split.even-distribution.factor.upper-bound = 100
+ #split.even-distribution.factor.lower-bound = 0.05
+ #split.sample-sharding.threshold = 1000
+ #split.inverse-sampling.rate = 1000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
