This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch release-3.4 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.4 by this push: new a2bf68e42 [FLINK-37347][DOCS] Support any column as chunk key column (postgres, orcale, db2, sqlserver) (#3924) (#4028) a2bf68e42 is described below commit a2bf68e424f5a992b2c44be05ca72043a8fe6865 Author: SeungMin <semi...@gmail.com> AuthorDate: Tue May 27 14:36:57 2025 +0900 [FLINK-37347][DOCS] Support any column as chunk key column (postgres, orcale, db2, sqlserver) (#3924) (#4028) Co-authored-by: Hang Ruan <ruanhang1...@hotmail.com> --- .../docs/connectors/flink-sources/db2-cdc.md | 37 ++++++++++++++++++- .../docs/connectors/flink-sources/mysql-cdc.md | 36 +++++++++++++++++++ .../docs/connectors/flink-sources/oracle-cdc.md | 41 ++++++++++++++++++++- .../docs/connectors/flink-sources/postgres-cdc.md | 42 ++++++++++++++++++++-- .../docs/connectors/flink-sources/sqlserver-cdc.md | 41 ++++++++++++++++++++- .../docs/connectors/flink-sources/db2-cdc.md | 39 +++++++++++++++++++- .../docs/connectors/flink-sources/mysql-cdc.md | 38 ++++++++++++++++++-- .../docs/connectors/flink-sources/oracle-cdc.md | 40 ++++++++++++++++++++- .../docs/connectors/flink-sources/postgres-cdc.md | 42 ++++++++++++++++++++-- .../docs/connectors/flink-sources/sqlserver-cdc.md | 40 ++++++++++++++++++++- 10 files changed, 384 insertions(+), 12 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md index 25e17d327..be9435d63 100644 --- a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md @@ -241,7 +241,7 @@ for more detailed information.</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td> + By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance.</td> </tr> <tr> <td>debezium.*</td> @@ -425,6 +425,41 @@ public class Db2ParallelSourceExample { } ``` +### Tables Without primary keys + +Starting from version 3.4.0, DB2 CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of a DB2 CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a DB2 table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + Data Type Mapping ---------------- diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index 7ae1ae449..a3878cc6c 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -255,6 +255,17 @@ Flink SQL> SELECT * FROM orders; <td>Integer</td> <td>读取表快照时每次读取数据的最大条数。</td> </tr> + <tr> + <td>scan.incremental.snapshot.chunk.key-column</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>表快照的分片键,在读取表的快照时,被捕获的表会按分片键拆分为多个分片。 + 默认情况下,分片键是主键的第一列。可以使用非主键列作为分片键,但这可能会导致查询性能下降。 + <br> + <b>警告:</b> 使用非主键列作为分片键可能会导致数据不一致。请参阅 <a href="#警告">警告</a> 了解详细信息。 + </td> + </tr> <tr> <td>scan.startup.mode</td> <td>optional</td> @@ -809,6 +820,31 @@ $ ./bin/flink run \ * 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。 * 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。 +#### 警告 + +在 MySQL 表中,若使用 **非主键列** 作为有主键表的 `scan.incremental.snapshot.chunk.key-column`,可能导致**数据不一致**。以下为可能出现的问题及其缓解方案。 + +#### 问题场景 + +- **表结构:** + - **主键:** `id` + - **分片键列 :** `pid`(非主键) + +- **快照分片 :** + - **分片 0:** `1 < pid <= 3` + - **分片 1:** `3 < pid <= 5` + +- **操作 :** + - 两个子任务并行读取 **分片 0** 和 **分片 1**。 + - 在读取过程中,发生了一次 **更新** 操作,使 `id=0` 的 `pid` 从 `2` 变为 `4`,在两个分片的**高低水位**间都包含此次变更,导致该更新操作在增量阶段不会被处理。 + +- **结果 :** + - **分片 0:** 记录 `[id=0, pid=2]` + - **分片 1:** 记录 `[id=0, pid=4]` + +由于**处理顺序**无法保证,最终 `id=0` 的 `pid` 可能为 `2` 或 `4`,从而导致数据不一致。 + + ### 可用的指标 指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/): diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md index 368b8fe18..8c44d44d0 100644 --- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md @@ -420,7 +420,10 @@ Connector Options <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is 'ROWID'. This column must be a column of the primary key.</td> + By default, the chunk key is 'ROWID'. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. + <br> + <b>Warning:</b> Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see <a href="#warning">Warning</a> for details. + </td> </tr> <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> @@ -645,6 +648,42 @@ public class OracleSourceExample { 1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。 2. 对于 Oracle,Group 的名称会类似于 `test_database.test_schema.test_table`。 +### Tables Without primary keys + +Starting from version 3.4.0, Oracle CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of an Oracle CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for an oracle table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + + Data Type Mapping ---------------- <div class="wy-table-responsive"> diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index 25bfacede..dfd3a9d43 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -343,7 +343,10 @@ The following options is available only when `scan.incremental.snapshot.enabled= <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td> + By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. + <br> + <b>Warning:</b> Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see <a href="#warning">Warning</a> for details. + </td> </tr> <tr> <td>chunk-key.even-distribution.factor.lower-bound</td> @@ -454,7 +457,7 @@ Incremental snapshot reading is a new mechanism to read snapshot of a table. Com * (2) PostgreSQL CDC Source can perform checkpoints in the chunk granularity during snapshot reading * (3) PostgreSQL CDC Source doesn't need to acquire global read lock before snapshot reading -During the incremental snapshot reading, the PostgreSQL CDC Source firstly splits snapshot chunks (splits) by primary key of table, +During the incremental snapshot reading, the PostgreSQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table, and then PostgreSQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk. ### Exactly-Once Processing @@ -578,6 +581,41 @@ public class PostgreSQLSourceExample { 1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。 2. 对于 PostgreSQL,Group 的名称会类似于 `test_database.test_schema.test_table`。 +### Tables Without primary keys + +Starting from version 3.4.0, Postgres CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of a Postgres CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a Postgres table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + Data Type Mapping ---------------- diff --git a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md index 49b97f2e2..6199506f7 100644 --- a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md @@ -236,7 +236,10 @@ Connector Options <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td> + By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. + <br> + <b>Warning:</b> Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see <a href="#warning">Warning</a> for details. + </td> </tr> <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> @@ -440,6 +443,42 @@ public class SqlServerIncrementalSourceExample { 1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。 2. 对于 SQLServer,Group 的名称会类似于 `test_database.test_schema.test_table`。 +### Tables Without primary keys + +Starting from version 3.4.0, SQLServer CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of a SQLServer CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a SQLServer table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + + Data Type Mapping ---------------- diff --git a/docs/content/docs/connectors/flink-sources/db2-cdc.md b/docs/content/docs/connectors/flink-sources/db2-cdc.md index 09d7895fe..e9878db26 100644 --- a/docs/content/docs/connectors/flink-sources/db2-cdc.md +++ b/docs/content/docs/connectors/flink-sources/db2-cdc.md @@ -241,7 +241,7 @@ for more detailed information.</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td> + By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance.</td> </tr> <tr> <td>debezium.*</td> @@ -425,6 +425,43 @@ Notice: 1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name. 2. For DB2, the group name will be like `test_database.test_schema.test_table`. +### Tables Without primary keys + +Starting from version 3.4.0, DB2 CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of a DB2 CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a DB2 table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + + + Data Type Mapping ---------------- diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 2a7f3ca72..b9713ed47 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -254,6 +254,17 @@ Connector Options <td>Integer</td> <td>The maximum fetch size for per poll when read table snapshot.</td> </tr> + <tr> + <td>scan.incremental.snapshot.chunk.key-column</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. + By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. + <br> + <b>Warning:</b> Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see <a href="#warning">Warning</a> for details. + </td> + </tr> <tr> <td>scan.startup.mode</td> <td>optional</td> @@ -316,8 +327,7 @@ Connector Options </td> </tr> <tr> - <td>debezium.min.row. - count.to.stream.result</td> + <td>debezium.min.row.count.to.stream.result</td> <td>optional</td> <td style="word-wrap: break-word;">1000</td> <td>Integer</td> @@ -846,6 +856,30 @@ There are two places that need to be taken care of. * If no update operation is performed on the specified column, the exactly-once semantics is ensured. * If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a MySQL table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + ### About converting binary type data to base64 encoded data ```sql diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md b/docs/content/docs/connectors/flink-sources/oracle-cdc.md index 769421899..80a9b10d8 100644 --- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md +++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md @@ -421,7 +421,10 @@ Connector Options <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is 'ROWID'. This column must be a column of the primary key.</td> + By default, the chunk key is 'ROWID'. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. + <br> + <b>Warning:</b> Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see <a href="#warning">Warning</a> for details. + </td> </tr> <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> @@ -646,6 +649,41 @@ Notice: 1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name. 2. For Oracle, the group name will be like `test_database.test_schema.test_table`. +### Tables Without primary keys + +Starting from version 3.4.0, Oracle CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of an Oracle CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for an oracle table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + Data Type Mapping ---------------- <div class="wy-table-responsive"> diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index b8e476801..3bf19923f 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -340,7 +340,10 @@ The following options is available only when `scan.incremental.snapshot.enabled= <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td> + By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. + <br> + <b>Warning:</b> Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see <a href="#warning">Warning</a> for details. + </td> </tr> <tr> <td>chunk-key.even-distribution.factor.lower-bound</td> @@ -455,7 +458,7 @@ Incremental snapshot reading is a new mechanism to read snapshot of a table. Com * (2) PostgreSQL CDC Source can perform checkpoints in the chunk granularity during snapshot reading * (3) PostgreSQL CDC Source doesn't need to acquire global read lock before snapshot reading -During the incremental snapshot reading, the PostgreSQL CDC Source firstly splits snapshot chunks (splits) by primary key of table, +During the incremental snapshot reading, the PostgreSQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table, and then PostgreSQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk. ### Exactly-Once Processing @@ -579,6 +582,41 @@ Notice: 1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name. 2. For PostgreSQL, the group name will be like `test_database.test_schema.test_table`. +### Tables Without primary keys + +Starting from version 3.4.0, Postgres CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of a Postgres CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a Postgres table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + ## Data Type Mapping <div class="wy-table-responsive"> diff --git a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md index 683ed15a3..2374617f6 100644 --- a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md +++ b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md @@ -236,7 +236,10 @@ Connector Options <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. - By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td> + By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. + <br> + <b>Warning:</b> Using a non-primary key column as a chunk key may lead to data inconsistencies. Please see <a href="#warning">Warning</a> for details. + </td> </tr> <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> @@ -420,6 +423,41 @@ public class SqlServerIncrementalSourceExample { } ``` +### Tables Without primary keys + +Starting from version 3.4.0, SQLServer CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the `scan.incremental.snapshot.chunk.key-column` option and specify one non-null field. + +There are two places that need to be taken care of. + +1. If there is an index in the table, try to use a column which is contained in the index in `scan.incremental.snapshot.chunk.key-column`. This will increase the speed of select statement. +2. The processing semantics of a SQLServer CDC table without primary keys is determined based on the behavior of the column that are specified by the `scan.incremental.snapshot.chunk.key-column`. +* If no update operation is performed on the specified column, the exactly-once semantics is ensured. +* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. + +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a SQLServer table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + ### Available Source metrics Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/):