This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new d9890a416 [hotfix][minor] Correct variables name according to
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED option name
d9890a416 is described below
commit d9890a416ebfdbbbd3f229c2cfb4f0986852a1df
Author: Kunni <[email protected]>
AuthorDate: Wed Mar 12 13:39:51 2025 +0800
[hotfix][minor] Correct variables name according to
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED option name
This closes #3943
---
.../connectors/mysql/factory/MySqlDataSourceFactory.java | 6 +++---
.../connectors/mysql/source/MySqlDataSourceOptions.java | 13 +++++++------
.../mysql/source/MySqlDataSourceFactoryTest.java | 4 ++--
.../connectors/base/config/JdbcSourceConfigFactory.java | 3 ++-
.../flink/cdc/connectors/base/options/SourceOptions.java | 13 +++++++------
.../flink/cdc/connectors/db2/table/Db2TableSource.java | 6 ++++--
.../cdc/connectors/db2/table/Db2TableSourceFactory.java | 6 +++---
.../connectors/db2/table/Db2TableSourceFactoryTest.java | 5 +++--
.../cdc/connectors/mongodb/table/MongoDBTableSource.java | 6 ++++--
.../mongodb/table/MongoDBTableSourceFactory.java | 6 +++---
.../connectors/mongodb/table/MongoDBTableFactoryTest.java | 8 +++++---
.../cdc/connectors/mysql/table/MySqlTableSource.java | 6 ++++--
.../mysql/table/MySqlTableSourceFactoryTest.java | 1 +
.../cdc/connectors/oracle/table/OracleTableSource.java | 6 ++++--
.../connectors/oracle/table/OracleTableSourceFactory.java | 6 +++---
.../oracle/table/OracleTableSourceFactoryTest.java | 15 +++++++++------
.../connectors/postgres/table/PostgreSQLTableFactory.java | 6 +++---
.../connectors/postgres/table/PostgreSQLTableSource.java | 6 ++++--
.../postgres/table/PostgreSQLTableFactoryTest.java | 12 ++++++------
.../connectors/sqlserver/table/SqlServerTableFactory.java | 6 +++---
.../connectors/sqlserver/table/SqlServerTableSource.java | 6 ++++--
.../sqlserver/table/SqlServerTableFactoryTest.java | 8 ++++----
22 files changed, 88 insertions(+), 66 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index de47d2367..cbc996aff 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -76,9 +76,9 @@ import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
@@ -152,7 +152,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
boolean isParsingOnLineSchemaChanges =
config.get(PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
-
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize,
1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -341,7 +341,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
options.add(USE_LEGACY_JSON_FORMAT);
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
- options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index bb7d7b866..2ffad8b2e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -315,10 +315,11 @@ public class MySqlDataSourceOptions {
"Whether to use legacy json format. The default
value is true, which means there is no whitespace before value and after comma
in json format.");
@Experimental
- public static final ConfigOption<Boolean>
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
-
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to assign the unbounded chunks first
during snapshot reading phase. This might help reduce the risk of the
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of
the largest unbounded chunk. Defaults to false.");
+ public static final ConfigOption<Boolean>
+ SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
+
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to assign the unbounded chunks
first during snapshot reading phase. This might help reduce the risk of the
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of
the largest unbounded chunk. Defaults to false.");
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index 86bd01d75..a8b2126f9 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -41,8 +41,8 @@ import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
-import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
@@ -261,7 +261,7 @@ public class MySqlDataSourceFactoryTest extends
MySqlSourceTestBase {
// optional option
options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "true");
- options.put(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.key(),
"true");
+
options.put(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.key(),
"true");
Factory.Context context = new
MockContext(Configuration.fromMap(options));
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
index 3dffd13ee..2ab9bb889 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
@@ -61,7 +61,8 @@ public abstract class JdbcSourceConfigFactory implements
Factory<JdbcSourceConfi
protected boolean scanNewlyAddedTableEnabled =
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
protected boolean assignUnboundedChunkFirst =
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue();
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
+ .defaultValue();
/** Integer port number of the database server. */
public JdbcSourceConfigFactory hostname(String hostname) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
index 9ff268cad..4318185b9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java
@@ -139,10 +139,11 @@ public class SourceOptions {
"Whether capture the newly added tables when
restoring from a savepoint/checkpoint or not, by default is false.");
@Experimental
- public static final ConfigOption<Boolean>
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
-
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to assign the unbounded chunks first
during snapshot reading phase. This might help reduce the risk of the
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of
the largest unbounded chunk. Defaults to false.");
+ public static final ConfigOption<Boolean>
+ SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
+
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to assign the unbounded chunks
first during snapshot reading phase. This might help reduce the risk of the
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of
the largest unbounded chunk. Defaults to false.");
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
index 7c6411001..eebe109e0 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java
@@ -275,7 +275,8 @@ public class Db2TableSource implements ScanTableSource,
SupportsReadingMetadata
&& Objects.equals(distributionFactorLower,
that.distributionFactorLower)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
- && Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill);
+ && Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill)
+ && Objects.equals(assignUnboundedChunkFirst,
that.assignUnboundedChunkFirst);
}
@Override
@@ -302,7 +303,8 @@ public class Db2TableSource implements ScanTableSource,
SupportsReadingMetadata
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ assignUnboundedChunkFirst);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
index b3ea3189c..529a8f0bc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java
@@ -40,10 +40,10 @@ import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CON
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -140,7 +140,7 @@ public class Db2TableSourceFactory implements
DynamicTableSourceFactory {
boolean closeIdleReaders =
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean assignUnboundedChunkFirst =
-
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
@@ -214,7 +214,7 @@ public class Db2TableSourceFactory implements
DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
- options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
index 799964ff7..520a7c105 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
@@ -119,7 +119,7 @@ public class Db2TableSourceFactoryTest {
null,
false,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -130,6 +130,7 @@ public class Db2TableSourceFactoryTest {
options.put("port", "50000");
options.put("server-time-zone", "Asia/Shanghai");
options.put("debezium.snapshot.mode", "schema_only");
+ options.put("scan.incremental.snapshot.unbounded-chunk-first.enabled",
"true");
DynamicTableSource actualSource = createTableSource(options, SCHEMA);
Properties dbzProperties = new Properties();
@@ -249,7 +250,7 @@ public class Db2TableSourceFactoryTest {
null,
false,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
index 217a70cab..2ea676309 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
@@ -352,7 +352,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
&& Objects.equals(enableFullDocPrePostImage,
that.enableFullDocPrePostImage)
&& Objects.equals(noCursorTimeout, that.noCursorTimeout)
&& Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill)
- && Objects.equals(scanNewlyAddedTableEnabled,
that.scanNewlyAddedTableEnabled);
+ && Objects.equals(scanNewlyAddedTableEnabled,
that.scanNewlyAddedTableEnabled)
+ && Objects.equals(assignUnboundedChunkFirst,
that.assignUnboundedChunkFirst);
}
@Override
@@ -383,7 +384,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
enableFullDocPrePostImage,
noCursorTimeout,
skipSnapshotBackfill,
- scanNewlyAddedTableEnabled);
+ scanNewlyAddedTableEnabled,
+ assignUnboundedChunkFirst);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
index 1b40ff589..47ade2675 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
@@ -36,8 +36,8 @@ import java.util.Set;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -108,7 +108,7 @@ public class MongoDBTableSourceFactory implements
DynamicTableSourceFactory {
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean scanNewlyAddedTableEnabled =
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean assignUnboundedChunkFirst =
-
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
@@ -232,7 +232,7 @@ public class MongoDBTableSourceFactory implements
DynamicTableSourceFactory {
options.add(SCAN_NO_CURSOR_TIMEOUT);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
- options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
return options;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index 685f898d6..9fbe2116c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -48,8 +48,8 @@ import java.util.Map;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
@@ -158,7 +158,7 @@ public class MongoDBTableFactoryTest {
SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
-
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -183,6 +183,8 @@ public class MongoDBTableFactoryTest {
options.put("scan.newly-added-table.enabled", "true");
options.put("scan.full-changelog", "true");
options.put("scan.cursor.no-timeout", "false");
+ options.put("scan.incremental.snapshot.unbounded-chunk-first.enabled",
"true");
+
DynamicTableSource actualSource = createTableSource(SCHEMA, options);
MongoDBTableSource expectedSource =
@@ -253,7 +255,7 @@ public class MongoDBTableFactoryTest {
SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
-
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index f13a8a495..110075442 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -364,7 +364,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(skipSnapshotBackFill,
that.skipSnapshotBackFill)
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
- && useLegacyJsonFormat == that.useLegacyJsonFormat;
+ && useLegacyJsonFormat == that.useLegacyJsonFormat
+ && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst;
}
@Override
@@ -399,7 +400,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
chunkKeyColumn,
skipSnapshotBackFill,
parseOnlineSchemaChanges,
- useLegacyJsonFormat);
+ useLegacyJsonFormat,
+ assignUnboundedChunkFirst);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index deb06116c..0baa8f248 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -822,6 +822,7 @@ public class MySqlTableSourceFactoryTest {
Map<String, String> properties = getAllOptions();
properties.put("scan.parse.online.schema.changes.enabled", "true");
properties.put("use.legacy.json.format", "true");
+
properties.put("scan.incremental.snapshot.unbounded-chunk-first.enabled",
"true");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
index d9a0ab87e..2cb3e899b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
@@ -302,7 +302,8 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
&& Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill)
- && Objects.equals(scanNewlyAddedTableEnabled,
that.scanNewlyAddedTableEnabled);
+ && Objects.equals(scanNewlyAddedTableEnabled,
that.scanNewlyAddedTableEnabled)
+ && Objects.equals(assignUnboundedChunkFirst,
that.assignUnboundedChunkFirst);
}
@Override
@@ -333,7 +334,8 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill,
- scanNewlyAddedTableEnabled);
+ scanNewlyAddedTableEnabled,
+ assignUnboundedChunkFirst);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
index 279dafe81..31df24ed8 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
@@ -44,10 +44,10 @@ import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCA
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
@@ -110,7 +110,7 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean scanNewlyAddedTableEnabled =
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean assignUnboundedChunkFirst =
-
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
@@ -188,7 +188,7 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
- options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
index c679d1e2e..3bb469f34 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
@@ -120,7 +120,7 @@ public class OracleTableSourceFactoryTest {
JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -159,7 +159,7 @@ public class OracleTableSourceFactoryTest {
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -203,7 +203,7 @@ public class OracleTableSourceFactoryTest {
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
true,
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -231,6 +231,9 @@ public class OracleTableSourceFactoryTest {
options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(),
"true");
options.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.key(),
"true");
options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(),
"true");
+ options.put(
+
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.key(),
+ "true");
options.put(
JdbcSourceOptions.CONNECT_TIMEOUT.key(),
@@ -312,7 +315,7 @@ public class OracleTableSourceFactoryTest {
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -352,7 +355,7 @@ public class OracleTableSourceFactoryTest {
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -396,7 +399,7 @@ public class OracleTableSourceFactoryTest {
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
index e6d4cd1ff..b9caca44c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
@@ -43,8 +43,8 @@ import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCH
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHANGELOG_MODE;
@@ -118,7 +118,7 @@ public class PostgreSQLTableFactory implements
DynamicTableSourceFactory {
boolean isScanNewlyAddedTableEnabled =
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
int lsnCommitCheckpointsDelay =
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean assignUnboundedChunkFirst =
-
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
@@ -208,7 +208,7 @@ public class PostgreSQLTableFactory implements
DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
- options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
index 95405df7e..05553a795 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
@@ -334,7 +334,8 @@ public class PostgreSQLTableSource implements
ScanTableSource, SupportsReadingMe
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
&& Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill)
- && Objects.equals(scanNewlyAddedTableEnabled,
that.scanNewlyAddedTableEnabled);
+ && Objects.equals(scanNewlyAddedTableEnabled,
that.scanNewlyAddedTableEnabled)
+ && Objects.equals(assignUnboundedChunkFirst,
that.assignUnboundedChunkFirst);
}
@Override
@@ -368,7 +369,8 @@ public class PostgreSQLTableSource implements
ScanTableSource, SupportsReadingMe
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill,
- scanNewlyAddedTableEnabled);
+ scanNewlyAddedTableEnabled,
+ assignUnboundedChunkFirst);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
index a340fac67..a913c40b4 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
@@ -55,9 +55,9 @@ import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CON
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -156,7 +156,7 @@ public class PostgreSQLTableFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -203,7 +203,7 @@ public class PostgreSQLTableFactoryTest {
true,
true,
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -248,7 +248,7 @@ public class PostgreSQLTableFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("row_kind", "op_ts", "database_name",
"schema_name", "table_name");
@@ -303,7 +303,7 @@ public class PostgreSQLTableFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -348,7 +348,7 @@ public class PostgreSQLTableFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
index 3a43fba8a..42ff761fa 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java
@@ -40,10 +40,10 @@ import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CON
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
-import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -141,7 +141,7 @@ public class SqlServerTableFactory implements
DynamicTableSourceFactory {
boolean closeIdleReaders =
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean assignUnboundedChunkFirst =
-
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
@@ -215,7 +215,7 @@ public class SqlServerTableFactory implements
DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
- options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
index d43846192..377b0d573 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java
@@ -288,7 +288,8 @@ public class SqlServerTableSource implements
ScanTableSource, SupportsReadingMet
&& Objects.equals(distributionFactorLower,
that.distributionFactorLower)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
- && Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill);
+ && Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill)
+ && Objects.equals(assignUnboundedChunkFirst,
that.assignUnboundedChunkFirst);
}
@Override
@@ -317,7 +318,8 @@ public class SqlServerTableSource implements
ScanTableSource, SupportsReadingMet
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ assignUnboundedChunkFirst);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
index 5c9ac460b..1c2e539a3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java
@@ -113,7 +113,7 @@ public class SqlServerTableFactoryTest {
null,
false,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -158,7 +158,7 @@ public class SqlServerTableFactoryTest {
"testCol",
true,
true,
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -201,7 +201,7 @@ public class SqlServerTableFactoryTest {
"testCol",
true,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -243,7 +243,7 @@ public class SqlServerTableFactoryTest {
null,
false,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED
.defaultValue());
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =