This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f251898 [FLINK-37828] Enable 
scan.incremental.snapshot.unbounded-chunk-first by default for improved 
stability (#4082)
81f251898 is described below

commit 81f251898e326edddebedcb3c709bf702a3b3c6f
Author: Junbo Wang <beryllw...@gmail.com>
AuthorDate: Fri Aug 8 11:49:25 2025 +0800

    [FLINK-37828] Enable scan.incremental.snapshot.unbounded-chunk-first by 
default for improved stability (#4082)
---
 .../docs/connectors/flink-sources/db2-cdc.md       |  3 +-
 .../docs/connectors/flink-sources/mongodb-cdc.md   |  3 +-
 .../docs/connectors/flink-sources/mysql-cdc.md     |  3 +-
 .../docs/connectors/flink-sources/oracle-cdc.md    |  3 +-
 .../docs/connectors/flink-sources/postgres-cdc.md  |  3 +-
 .../docs/connectors/flink-sources/sqlserver-cdc.md |  3 +-
 .../docs/connectors/flink-sources/db2-cdc.md       |  3 +-
 .../docs/connectors/flink-sources/mongodb-cdc.md   |  3 +-
 .../docs/connectors/flink-sources/mysql-cdc.md     |  3 +-
 .../docs/connectors/flink-sources/oracle-cdc.md    |  3 +-
 .../docs/connectors/flink-sources/postgres-cdc.md  |  3 +-
 .../docs/connectors/flink-sources/sqlserver-cdc.md |  3 +-
 .../cdc/connectors/base/options/SourceOptions.java |  5 ++--
 .../mysql/source/config/MySqlSourceOptions.java    |  5 ++--
 .../source/fetch/PostgresScanFetchTaskTest.java    | 22 +++++++++++++-
 .../reader/fetch/SqlServerScanFetchTaskTest.java   | 34 ++++++++++++++++++++--
 16 files changed, 68 insertions(+), 34 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md
index be9435d63..6645dcd31 100644
--- a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md
@@ -267,12 +267,11 @@ Db2 server.
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     </tbody>
diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index faa85c1c2..f98c7b6db 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -328,12 +328,11 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         快照读取阶段是否先分配 UnboundedChunk。<br>
         这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> 
-        这是一项实验特性,默认为 false。
       </td>
     </tr>
     <tr>
diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index bf0415dd1..53962cd8b 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -403,12 +403,11 @@ Flink SQL> SELECT * FROM orders;
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         快照读取阶段是否先分配 UnboundedChunk。<br>
         这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> 
-        这是一项实验特性,默认为 false。
       </td>
     </tr>
     <tr>
diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
index a8247f461..596e20c1d 100644
--- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
@@ -428,12 +428,11 @@ Connector Options
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
index 424b7d4dc..557e1d697 100644
--- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
@@ -248,12 +248,11 @@ Connector Options
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
index 28f40430b..bb43c5721 100644
--- a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
@@ -244,12 +244,11 @@ Connector Options
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git a/docs/content/docs/connectors/flink-sources/db2-cdc.md 
b/docs/content/docs/connectors/flink-sources/db2-cdc.md
index e9878db26..4ffefd8c7 100644
--- a/docs/content/docs/connectors/flink-sources/db2-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/db2-cdc.md
@@ -267,12 +267,11 @@ Db2 server.
      <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     </tbody>
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md 
b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index 5d6e9bd77..98353a3b8 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -353,12 +353,11 @@ Connector Options
      <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 59d962e69..2752298c2 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -428,12 +428,11 @@ Only valid for cdc 1.x version. During a snapshot 
operation, the connector will
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md 
b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
index d5b29d34e..596c908b4 100644
--- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
@@ -429,12 +429,11 @@ Connector Options
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md 
b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index 91e7c13cb..11b0a275f 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -245,12 +245,11 @@ SELECT * FROM shipments;
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md 
b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
index e4047fa4f..5d8a9156c 100644
--- a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
@@ -244,12 +244,11 @@ Connector Options
     <tr>
       <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>
         Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
         This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br> 
-        Experimental option, defaults to false.
       </td>
     </tr>
     <tr>
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
index 0ad152e46..38d712b94 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
@@ -138,14 +138,13 @@ public class SourceOptions {
                     .withDescription(
                             "Whether capture the newly added tables when 
restoring from a savepoint/checkpoint or not, by default is false.");
 
-    @Experimental
     public static final ConfigOption<Boolean>
             SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
                     
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
                             .booleanType()
-                            .defaultValue(false)
+                            .defaultValue(true)
                             .withDescription(
-                                    "Whether to assign the unbounded chunks 
first during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.  Defaults to false.");
+                                    "Whether to assign the unbounded chunks 
first during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.");
 
     @Experimental
     public static final ConfigOption<Boolean> 
SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index a00d6d564..a8e143f5f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -286,11 +286,10 @@ public class MySqlSourceOptions {
                     .withDescription(
                             "Whether to use legacy json format. The default 
value is true, which means there is no whitespace before value and after comma 
in json format.");
 
-    @Experimental
     public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST =
             
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
                     .booleanType()
-                    .defaultValue(false)
+                    .defaultValue(true)
                     .withDescription(
-                            "Whether to assign the unbounded chunks first 
during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk. Defaults to false.");
+                            "Whether to assign the unbounded chunks first 
during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index ee3dcdcb0..a0ff12c4f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -325,7 +325,11 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
                     new PostgresSourceFetchTaskContext(sourceConfig, 
postgresDialect);
 
             return readTableSnapshotSplits(
-                    snapshotSplits, postgresSourceFetchTaskContext, 1, 
dataType, hooks);
+                    reOrderSnapshotSplits(snapshotSplits),
+                    postgresSourceFetchTaskContext,
+                    1,
+                    dataType,
+                    hooks);
         }
     }
 
@@ -398,4 +402,20 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
         snapshotSplitAssigner.close();
         return snapshotSplitList;
     }
+
+    // Due to the default enabling of 
scan.incremental.snapshot.unbounded-chunk-first.enabled,
+    // the split order becomes [end,null], [null,start], ... which is 
different from the original
+    // order.
+    // The first split in the list is actually the last unbounded split that 
should be at the end.
+    // This method adjusts the order to restore the original sequence: 
[null,start], ...,
+    // [end,null],
+    // ensuring the correctness of test cases.
+    private List<SnapshotSplit> reOrderSnapshotSplits(List<SnapshotSplit> 
snapshotSplits) {
+        if (snapshotSplits.size() > 1) {
+            SnapshotSplit firstSplit = snapshotSplits.get(0);
+            snapshotSplits.remove(0);
+            snapshotSplits.add(firstSplit);
+        }
+        return snapshotSplits;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
index ae6fe0f3b..09008a3d0 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
@@ -125,7 +125,11 @@ class SqlServerScanFetchTaskTest extends 
SqlServerSourceTestBase {
 
         List<String> actual =
                 readTableSnapshotSplits(
-                        snapshotSplits, sqlServerSourceFetchTaskContext, 1, 
dataType, hooks);
+                        reOrderSnapshotSplits(snapshotSplits),
+                        sqlServerSourceFetchTaskContext,
+                        1,
+                        dataType,
+                        hooks);
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
@@ -190,7 +194,11 @@ class SqlServerScanFetchTaskTest extends 
SqlServerSourceTestBase {
 
         List<String> actual =
                 readTableSnapshotSplits(
-                        snapshotSplits, sqlServerSourceFetchTaskContext, 1, 
dataType, hooks);
+                        reOrderSnapshotSplits(snapshotSplits),
+                        sqlServerSourceFetchTaskContext,
+                        1,
+                        dataType,
+                        hooks);
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
@@ -275,7 +283,11 @@ class SqlServerScanFetchTaskTest extends 
SqlServerSourceTestBase {
 
         List<String> actual =
                 readTableSnapshotSplits(
-                        snapshotSplits, sqlServerSourceFetchTaskContext, 1, 
dataType, hooks);
+                        reOrderSnapshotSplits(snapshotSplits),
+                        sqlServerSourceFetchTaskContext,
+                        1,
+                        dataType,
+                        hooks);
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
@@ -359,4 +371,20 @@ class SqlServerScanFetchTaskTest extends 
SqlServerSourceTestBase {
         }
         return true;
     }
+
+    // Due to the default enabling of 
scan.incremental.snapshot.unbounded-chunk-first.enabled,
+    // the split order becomes [end,null], [null,start], ... which is 
different from the original
+    // order.
+    // The first split in the list is actually the last unbounded split that 
should be at the end.
+    // This method adjusts the order to restore the original sequence: 
[null,start], ...,
+    // [end,null],
+    // ensuring the correctness of test cases.
+    private List<SnapshotSplit> reOrderSnapshotSplits(List<SnapshotSplit> 
snapshotSplits) {
+        if (snapshotSplits.size() > 1) {
+            SnapshotSplit firstSplit = snapshotSplits.get(0);
+            snapshotSplits.remove(0);
+            snapshotSplits.add(firstSplit);
+        }
+        return snapshotSplits;
+    }
 }

Reply via email to