This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new d9890a416 [hotfix][minor] Correct variables name according to 
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED option name
d9890a416 is described below

commit d9890a416ebfdbbbd3f229c2cfb4f0986852a1df
Author: Kunni <[email protected]>
AuthorDate: Wed Mar 12 13:39:51 2025 +0800

    [hotfix][minor] Correct variables name according to 
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED option name
    
    This closes #3943
---
 .../connectors/mysql/factory/MySqlDataSourceFactory.java  |  6 +++---
 .../connectors/mysql/source/MySqlDataSourceOptions.java   | 13 +++++++------
 .../mysql/source/MySqlDataSourceFactoryTest.java          |  4 ++--
 .../connectors/base/config/JdbcSourceConfigFactory.java   |  3 ++-
 .../flink/cdc/connectors/base/options/SourceOptions.java  | 13 +++++++------
 .../flink/cdc/connectors/db2/table/Db2TableSource.java    |  6 ++++--
 .../cdc/connectors/db2/table/Db2TableSourceFactory.java   |  6 +++---
 .../connectors/db2/table/Db2TableSourceFactoryTest.java   |  5 +++--
 .../cdc/connectors/mongodb/table/MongoDBTableSource.java  |  6 ++++--
 .../mongodb/table/MongoDBTableSourceFactory.java          |  6 +++---
 .../connectors/mongodb/table/MongoDBTableFactoryTest.java |  8 +++++---
 .../cdc/connectors/mysql/table/MySqlTableSource.java      |  6 ++++--
 .../mysql/table/MySqlTableSourceFactoryTest.java          |  1 +
 .../cdc/connectors/oracle/table/OracleTableSource.java    |  6 ++++--
 .../connectors/oracle/table/OracleTableSourceFactory.java |  6 +++---
 .../oracle/table/OracleTableSourceFactoryTest.java        | 15 +++++++++------
 .../connectors/postgres/table/PostgreSQLTableFactory.java |  6 +++---
 .../connectors/postgres/table/PostgreSQLTableSource.java  |  6 ++++--
 .../postgres/table/PostgreSQLTableFactoryTest.java        | 12 ++++++------
 .../connectors/sqlserver/table/SqlServerTableFactory.java |  6 +++---
 .../connectors/sqlserver/table/SqlServerTableSource.java  |  6 ++++--
 .../sqlserver/table/SqlServerTableFactoryTest.java        |  8 ++++----
 22 files changed, 88 insertions(+), 66 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index de47d2367..cbc996aff 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -76,9 +76,9 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
@@ -152,7 +152,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         boolean isParsingOnLineSchemaChanges = 
config.get(PARSE_ONLINE_SCHEMA_CHANGES);
         boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
         boolean isAssignUnboundedChunkFirst =
-                
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+                
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
 
         validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 
1);
         validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -341,7 +341,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         options.add(USE_LEGACY_JSON_FORMAT);
         options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
         options.add(PARSE_ONLINE_SCHEMA_CHANGES);
-        options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index bb7d7b866..2ffad8b2e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -315,10 +315,11 @@ public class MySqlDataSourceOptions {
                             "Whether to use legacy json format. The default 
value is true, which means there is no whitespace before value and after comma 
in json format.");
 
     @Experimental
-    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
-            
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to assign the unbounded chunks first 
during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.  Defaults to false.");
+    public static final ConfigOption<Boolean>
+            SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
+                    
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
+                            .booleanType()
+                            .defaultValue(false)
+                            .withDescription(
+                                    "Whether to assign the unbounded chunks 
first during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.  Defaults to false.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index 86bd01d75..a8b2126f9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -41,8 +41,8 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
-import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
@@ -261,7 +261,7 @@ public class MySqlDataSourceFactoryTest extends 
MySqlSourceTestBase {
         // optional option
         options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
         options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "true");
-        options.put(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.key(), 
"true");
+        
options.put(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.key(), 
"true");
 
         Factory.Context context = new 
MockContext(Configuration.fromMap(options));
         MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
index 3dffd13ee..2ab9bb889 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
@@ -61,7 +61,8 @@ public abstract class JdbcSourceConfigFactory implements 
Factory<JdbcSourceConfi
     protected boolean scanNewlyAddedTableEnabled =
             JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
     protected boolean assignUnboundedChunkFirst =
-            
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue();
+            
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
+                    .defaultValue();
 
     /** Integer port number of the database server. */
     public JdbcSourceConfigFactory hostname(String hostname) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
index 9ff268cad..4318185b9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
@@ -139,10 +139,11 @@ public class SourceOptions {
                             "Whether capture the newly added tables when 
restoring from a savepoint/checkpoint or not, by default is false.");
 
     @Experimental
-    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
-            
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to assign the unbounded chunks first 
during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.  Defaults to false.");
+    public static final ConfigOption<Boolean>
+            SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
+                    
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
+                            .booleanType()
+                            .defaultValue(false)
+                            .withDescription(
+                                    "Whether to assign the unbounded chunks 
first during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.  Defaults to false.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
index 7c6411001..eebe109e0 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
@@ -275,7 +275,8 @@ public class Db2TableSource implements ScanTableSource, 
SupportsReadingMetadata
                 && Objects.equals(distributionFactorLower, 
that.distributionFactorLower)
                 && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
                 && Objects.equals(closeIdleReaders, that.closeIdleReaders)
-                && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill);
+                && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill)
+                && Objects.equals(assignUnboundedChunkFirst, 
that.assignUnboundedChunkFirst);
     }
 
     @Override
@@ -302,7 +303,8 @@ public class Db2TableSource implements ScanTableSource, 
SupportsReadingMetadata
                 distributionFactorLower,
                 chunkKeyColumn,
                 closeIdleReaders,
-                skipSnapshotBackfill);
+                skipSnapshotBackfill,
+                assignUnboundedChunkFirst);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
index b3ea3189c..529a8f0bc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
@@ -40,10 +40,10 @@ import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CON
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -140,7 +140,7 @@ public class Db2TableSourceFactory implements 
DynamicTableSourceFactory {
         boolean closeIdleReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         boolean assignUnboundedChunkFirst =
-                
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+                
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
 
         if (enableParallelRead) {
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -214,7 +214,7 @@ public class Db2TableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
-        options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
index 799964ff7..520a7c105 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
@@ -119,7 +119,7 @@ public class Db2TableSourceFactoryTest {
                         null,
                         false,
                         
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -130,6 +130,7 @@ public class Db2TableSourceFactoryTest {
         options.put("port", "50000");
         options.put("server-time-zone", "Asia/Shanghai");
         options.put("debezium.snapshot.mode", "schema_only");
+        options.put("scan.incremental.snapshot.unbounded-chunk-first.enabled", 
"true");
 
         DynamicTableSource actualSource = createTableSource(options, SCHEMA);
         Properties dbzProperties = new Properties();
@@ -249,7 +250,7 @@ public class Db2TableSourceFactoryTest {
                         null,
                         false,
                         
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
index 217a70cab..2ea676309 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
@@ -352,7 +352,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
                 && Objects.equals(enableFullDocPrePostImage, 
that.enableFullDocPrePostImage)
                 && Objects.equals(noCursorTimeout, that.noCursorTimeout)
                 && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill)
-                && Objects.equals(scanNewlyAddedTableEnabled, 
that.scanNewlyAddedTableEnabled);
+                && Objects.equals(scanNewlyAddedTableEnabled, 
that.scanNewlyAddedTableEnabled)
+                && Objects.equals(assignUnboundedChunkFirst, 
that.assignUnboundedChunkFirst);
     }
 
     @Override
@@ -383,7 +384,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
                 enableFullDocPrePostImage,
                 noCursorTimeout,
                 skipSnapshotBackfill,
-                scanNewlyAddedTableEnabled);
+                scanNewlyAddedTableEnabled,
+                assignUnboundedChunkFirst);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
index 1b40ff589..47ade2675 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
@@ -36,8 +36,8 @@ import java.util.Set;
 
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -108,7 +108,7 @@ public class MongoDBTableSourceFactory implements 
DynamicTableSourceFactory {
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         boolean assignUnboundedChunkFirst =
-                
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+                
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
 
         int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
         int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
@@ -232,7 +232,7 @@ public class MongoDBTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_NO_CURSOR_TIMEOUT);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
-        options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         return options;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index 685f898d6..9fbe2116c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -48,8 +48,8 @@ import java.util.Map;
 
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
@@ -158,7 +158,7 @@ public class MongoDBTableFactoryTest {
                         SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
                         SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
-                        
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -183,6 +183,8 @@ public class MongoDBTableFactoryTest {
         options.put("scan.newly-added-table.enabled", "true");
         options.put("scan.full-changelog", "true");
         options.put("scan.cursor.no-timeout", "false");
+        options.put("scan.incremental.snapshot.unbounded-chunk-first.enabled", 
"true");
+
         DynamicTableSource actualSource = createTableSource(SCHEMA, options);
 
         MongoDBTableSource expectedSource =
@@ -253,7 +255,7 @@ public class MongoDBTableFactoryTest {
                         SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
                         SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
-                        
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
 
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index f13a8a495..110075442 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -364,7 +364,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
                 && Objects.equals(skipSnapshotBackFill, 
that.skipSnapshotBackFill)
                 && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
-                && useLegacyJsonFormat == that.useLegacyJsonFormat;
+                && useLegacyJsonFormat == that.useLegacyJsonFormat
+                && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst;
     }
 
     @Override
@@ -399,7 +400,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 chunkKeyColumn,
                 skipSnapshotBackFill,
                 parseOnlineSchemaChanges,
-                useLegacyJsonFormat);
+                useLegacyJsonFormat,
+                assignUnboundedChunkFirst);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index deb06116c..0baa8f248 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -822,6 +822,7 @@ public class MySqlTableSourceFactoryTest {
         Map<String, String> properties = getAllOptions();
         properties.put("scan.parse.online.schema.changes.enabled", "true");
         properties.put("use.legacy.json.format", "true");
+        
properties.put("scan.incremental.snapshot.unbounded-chunk-first.enabled", 
"true");
 
         // validation for source
         DynamicTableSource actualSource = createTableSource(properties);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
index d9a0ab87e..2cb3e899b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
@@ -302,7 +302,8 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                 && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
                 && Objects.equals(closeIdleReaders, that.closeIdleReaders)
                 && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill)
-                && Objects.equals(scanNewlyAddedTableEnabled, 
that.scanNewlyAddedTableEnabled);
+                && Objects.equals(scanNewlyAddedTableEnabled, 
that.scanNewlyAddedTableEnabled)
+                && Objects.equals(assignUnboundedChunkFirst, 
that.assignUnboundedChunkFirst);
     }
 
     @Override
@@ -333,7 +334,8 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                 chunkKeyColumn,
                 closeIdleReaders,
                 skipSnapshotBackfill,
-                scanNewlyAddedTableEnabled);
+                scanNewlyAddedTableEnabled,
+                assignUnboundedChunkFirst);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
index 279dafe81..31df24ed8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
@@ -44,10 +44,10 @@ import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCA
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
@@ -110,7 +110,7 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         boolean assignUnboundedChunkFirst =
-                
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+                
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
 
         if (enableParallelRead) {
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -188,7 +188,7 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
-        options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
index c679d1e2e..3bb469f34 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
@@ -120,7 +120,7 @@ public class OracleTableSourceFactoryTest {
                         
JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
                         
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -159,7 +159,7 @@ public class OracleTableSourceFactoryTest {
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
                         
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -203,7 +203,7 @@ public class OracleTableSourceFactoryTest {
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
                         
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         true,
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -231,6 +231,9 @@ public class OracleTableSourceFactoryTest {
         
options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), 
"true");
         
options.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.key(), 
"true");
         options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), 
"true");
+        options.put(
+                
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.key(),
+                "true");
 
         options.put(
                 JdbcSourceOptions.CONNECT_TIMEOUT.key(),
@@ -312,7 +315,7 @@ public class OracleTableSourceFactoryTest {
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
                         
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -352,7 +355,7 @@ public class OracleTableSourceFactoryTest {
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
                         
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -396,7 +399,7 @@ public class OracleTableSourceFactoryTest {
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
                         
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
index e6d4cd1ff..b9caca44c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
@@ -43,8 +43,8 @@ import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCH
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHANGELOG_MODE;
@@ -118,7 +118,7 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
         boolean isScanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         int lsnCommitCheckpointsDelay = 
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
         boolean assignUnboundedChunkFirst =
-                
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+                
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
 
         if (enableParallelRead) {
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -208,7 +208,7 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
-        options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
index 95405df7e..05553a795 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
@@ -334,7 +334,8 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
                 && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
                 && Objects.equals(closeIdleReaders, that.closeIdleReaders)
                 && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill)
-                && Objects.equals(scanNewlyAddedTableEnabled, 
that.scanNewlyAddedTableEnabled);
+                && Objects.equals(scanNewlyAddedTableEnabled, 
that.scanNewlyAddedTableEnabled)
+                && Objects.equals(assignUnboundedChunkFirst, 
that.assignUnboundedChunkFirst);
     }
 
     @Override
@@ -368,7 +369,8 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
                 chunkKeyColumn,
                 closeIdleReaders,
                 skipSnapshotBackfill,
-                scanNewlyAddedTableEnabled);
+                scanNewlyAddedTableEnabled,
+                assignUnboundedChunkFirst);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
index a340fac67..a913c40b4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
@@ -55,9 +55,9 @@ import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CON
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -156,7 +156,7 @@ public class PostgreSQLTableFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
                         SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -203,7 +203,7 @@ public class PostgreSQLTableFactoryTest {
                         true,
                         true,
                         SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -248,7 +248,7 @@ public class PostgreSQLTableFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
                         SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys =
                 Arrays.asList("row_kind", "op_ts", "database_name", 
"schema_name", "table_name");
@@ -303,7 +303,7 @@ public class PostgreSQLTableFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
                         SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -348,7 +348,7 @@ public class PostgreSQLTableFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
                         SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
index 3a43fba8a..42ff761fa 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
@@ -40,10 +40,10 @@ import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CON
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -141,7 +141,7 @@ public class SqlServerTableFactory implements 
DynamicTableSourceFactory {
         boolean closeIdleReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         boolean assignUnboundedChunkFirst =
-                
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+                
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
 
         if (enableParallelRead) {
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -215,7 +215,7 @@ public class SqlServerTableFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
-        options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
index d43846192..377b0d573 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
@@ -288,7 +288,8 @@ public class SqlServerTableSource implements 
ScanTableSource, SupportsReadingMet
                 && Objects.equals(distributionFactorLower, 
that.distributionFactorLower)
                 && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
                 && Objects.equals(closeIdleReaders, that.closeIdleReaders)
-                && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill);
+                && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill)
+                && Objects.equals(assignUnboundedChunkFirst, 
that.assignUnboundedChunkFirst);
     }
 
     @Override
@@ -317,7 +318,8 @@ public class SqlServerTableSource implements 
ScanTableSource, SupportsReadingMet
                 distributionFactorLower,
                 chunkKeyColumn,
                 closeIdleReaders,
-                skipSnapshotBackfill);
+                skipSnapshotBackfill,
+                assignUnboundedChunkFirst);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
index 5c9ac460b..1c2e539a3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
@@ -113,7 +113,7 @@ public class SqlServerTableFactoryTest {
                         null,
                         false,
                         
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -158,7 +158,7 @@ public class SqlServerTableFactoryTest {
                         "testCol",
                         true,
                         true,
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -201,7 +201,7 @@ public class SqlServerTableFactoryTest {
                         "testCol",
                         true,
                         
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         assertEquals(expectedSource, actualSource);
     }
@@ -243,7 +243,7 @@ public class SqlServerTableFactoryTest {
                         null,
                         false,
                         
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
                                 .defaultValue());
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys =

Reply via email to