This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 81f251898 [FLINK-37828] Enable scan.incremental.snapshot.unbounded-chunk-first by default for improved stability (#4082) 81f251898 is described below commit 81f251898e326edddebedcb3c709bf702a3b3c6f Author: Junbo Wang <beryllw...@gmail.com> AuthorDate: Fri Aug 8 11:49:25 2025 +0800 [FLINK-37828] Enable scan.incremental.snapshot.unbounded-chunk-first by default for improved stability (#4082) --- .../docs/connectors/flink-sources/db2-cdc.md | 3 +- .../docs/connectors/flink-sources/mongodb-cdc.md | 3 +- .../docs/connectors/flink-sources/mysql-cdc.md | 3 +- .../docs/connectors/flink-sources/oracle-cdc.md | 3 +- .../docs/connectors/flink-sources/postgres-cdc.md | 3 +- .../docs/connectors/flink-sources/sqlserver-cdc.md | 3 +- .../docs/connectors/flink-sources/db2-cdc.md | 3 +- .../docs/connectors/flink-sources/mongodb-cdc.md | 3 +- .../docs/connectors/flink-sources/mysql-cdc.md | 3 +- .../docs/connectors/flink-sources/oracle-cdc.md | 3 +- .../docs/connectors/flink-sources/postgres-cdc.md | 3 +- .../docs/connectors/flink-sources/sqlserver-cdc.md | 3 +- .../cdc/connectors/base/options/SourceOptions.java | 5 ++-- .../mysql/source/config/MySqlSourceOptions.java | 5 ++-- .../source/fetch/PostgresScanFetchTaskTest.java | 22 +++++++++++++- .../reader/fetch/SqlServerScanFetchTaskTest.java | 34 ++++++++++++++++++++-- 16 files changed, 68 insertions(+), 34 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md index be9435d63..6645dcd31 100644 --- a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md @@ -267,12 +267,11 @@ Db2 server. <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> </tbody> diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md index faa85c1c2..f98c7b6db 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md @@ -328,12 +328,11 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能 <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> 快照读取阶段是否先分配 UnboundedChunk。<br> 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> - 这是一项实验特性,默认为 false。 </td> </tr> <tr> diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index bf0415dd1..53962cd8b 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -403,12 +403,11 @@ Flink SQL> SELECT * FROM orders; <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> 快照读取阶段是否先分配 UnboundedChunk。<br> 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> - 这是一项实验特性,默认为 false。 </td> </tr> <tr> diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md index a8247f461..596e20c1d 100644 --- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md @@ -428,12 +428,11 @@ Connector Options <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index 424b7d4dc..557e1d697 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -248,12 +248,11 @@ Connector Options <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md index 28f40430b..bb43c5721 100644 --- a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md @@ -244,12 +244,11 @@ Connector Options <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/docs/content/docs/connectors/flink-sources/db2-cdc.md b/docs/content/docs/connectors/flink-sources/db2-cdc.md index e9878db26..4ffefd8c7 100644 --- a/docs/content/docs/connectors/flink-sources/db2-cdc.md +++ b/docs/content/docs/connectors/flink-sources/db2-cdc.md @@ -267,12 +267,11 @@ Db2 server. <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> </tbody> diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md index 5d6e9bd77..98353a3b8 100644 --- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md @@ -353,12 +353,11 @@ Connector Options <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 59d962e69..2752298c2 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -428,12 +428,11 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md b/docs/content/docs/connectors/flink-sources/oracle-cdc.md index d5b29d34e..596c908b4 100644 --- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md +++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md @@ -429,12 +429,11 @@ Connector Options <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index 91e7c13cb..11b0a275f 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -245,12 +245,11 @@ SELECT * FROM shipments; <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md index e4047fa4f..5d8a9156c 100644 --- a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md +++ b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md @@ -244,12 +244,11 @@ Connector Options <tr> <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> <td>optional</td> - <td style="word-wrap: break-word;">false</td> + <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td> Whether to assign the unbounded chunks first during snapshot reading phase.<br> This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> - Experimental option, defaults to false. </td> </tr> <tr> diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java index 0ad152e46..38d712b94 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java @@ -138,14 +138,13 @@ public class SourceOptions { .withDescription( "Whether capture the newly added tables when restoring from a savepoint/checkpoint or not, by default is false."); - @Experimental public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED = ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( - "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk."); @Experimental public static final ConfigOption<Boolean> SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a00d6d564..a8e143f5f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -286,11 +286,10 @@ public class MySqlSourceOptions { .withDescription( "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); - @Experimental public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST = ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( - "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java index ee3dcdcb0..a0ff12c4f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java @@ -325,7 +325,11 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect); return readTableSnapshotSplits( - snapshotSplits, postgresSourceFetchTaskContext, 1, dataType, hooks); + reOrderSnapshotSplits(snapshotSplits), + postgresSourceFetchTaskContext, + 1, + dataType, + hooks); } } @@ -398,4 +402,20 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { snapshotSplitAssigner.close(); return snapshotSplitList; } + + // Due to the default enabling of scan.incremental.snapshot.unbounded-chunk-first.enabled, + // the split order becomes [end,null], [null,start], ... which is different from the original + // order. + // The first split in the list is actually the last unbounded split that should be at the end. + // This method adjusts the order to restore the original sequence: [null,start], ..., + // [end,null], + // ensuring the correctness of test cases. + private List<SnapshotSplit> reOrderSnapshotSplits(List<SnapshotSplit> snapshotSplits) { + if (snapshotSplits.size() > 1) { + SnapshotSplit firstSplit = snapshotSplits.get(0); + snapshotSplits.remove(0); + snapshotSplits.add(firstSplit); + } + return snapshotSplits; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java index ae6fe0f3b..09008a3d0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java @@ -125,7 +125,11 @@ class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase { List<String> actual = readTableSnapshotSplits( - snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType, hooks); + reOrderSnapshotSplits(snapshotSplits), + sqlServerSourceFetchTaskContext, + 1, + dataType, + hooks); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } @@ -190,7 +194,11 @@ class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase { List<String> actual = readTableSnapshotSplits( - snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType, hooks); + reOrderSnapshotSplits(snapshotSplits), + sqlServerSourceFetchTaskContext, + 1, + dataType, + hooks); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } @@ -275,7 +283,11 @@ class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase { List<String> actual = readTableSnapshotSplits( - snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType, hooks); + reOrderSnapshotSplits(snapshotSplits), + sqlServerSourceFetchTaskContext, + 1, + dataType, + hooks); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } @@ -359,4 +371,20 @@ class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase { } return true; } + + // Due to the default enabling of scan.incremental.snapshot.unbounded-chunk-first.enabled, + // the split order becomes [end,null], [null,start], ... which is different from the original + // order. + // The first split in the list is actually the last unbounded split that should be at the end. + // This method adjusts the order to restore the original sequence: [null,start], ..., + // [end,null], + // ensuring the correctness of test cases. + private List<SnapshotSplit> reOrderSnapshotSplits(List<SnapshotSplit> snapshotSplits) { + if (snapshotSplits.size() > 1) { + SnapshotSplit firstSplit = snapshotSplits.get(0); + snapshotSplits.remove(0); + snapshotSplits.add(firstSplit); + } + return snapshotSplits; + } }