This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 4bf5a395a [FLINK-34688][cdc-connector][mysql] Make scan newly table
trigger condition strictly
4bf5a395a is described below
commit 4bf5a395a5a8f83a5e309e5e985ad7c839b953db
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Aug 9 15:38:58 2024 +0800
[FLINK-34688][cdc-connector][mysql] Make scan newly table trigger condition
strictly
This closes #3519.
---
.../source/assigner/SnapshotSplitAssigner.java | 3 ++-
.../assigners/MySqlSnapshotSplitAssigner.java | 3 ++-
.../assigners/MySqlSnapshotSplitAssignerTest.java | 25 ++++++++++++++++++----
3 files changed, 25 insertions(+), 6 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index cd0e77200..a3a234b73 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -203,7 +203,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled()
- && !sourceConfig.getStartupOptions().isSnapshotOnly()) {
+ && !sourceConfig.getStartupOptions().isSnapshotOnly()
+ && AssignerStatus.isAssigningFinished(assignerStatus)) {
try {
// check whether we got newly added tables
final List<TableId> currentCapturedTables =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index e209921b5..9ea69b11a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -215,7 +215,8 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
private void captureNewlyAddedTables() {
// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
- && !sourceConfig.getStartupOptions().isSnapshotOnly()) {
+ && !sourceConfig.getStartupOptions().isSnapshotOnly()
+ && AssignerStatus.isAssigningFinished(assignerStatus)) {
// check whether we got newly added tables
try (JdbcConnection jdbc =
DebeziumUtils.openJdbcConnection(sourceConfig)) {
final List<TableId> currentCapturedTables =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 759827d9d..de875a0ed 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -484,7 +484,7 @@ public class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
}
@Test
- public void testScanNewlyAddedTableStartFromCheckpoint() {
+ public void
testScanNewlyAddedTableStartFromInitialAssigningFinishedCheckpoint() {
List<String> expected =
Arrays.asList(
"customers_sparse_dist [109] null",
@@ -492,7 +492,24 @@ public class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
"customers_even_dist [10] [18]",
"customers_even_dist [18] null",
"customer_card_single_line null null");
- assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint());
+ assertEquals(
+ expected,
+ getTestAssignSnapshotSplitsFromCheckpoint(
+ AssignerStatus.INITIAL_ASSIGNING_FINISHED));
+ }
+
+ @Test
+ public void
testScanNewlyAddedTableStartFromNewlyAddedAssigningSnapshotFinishedCheckpoint()
{
+ List<String> expected =
+ Arrays.asList(
+ "customers_sparse_dist [109] null",
+ "customers_even_dist null [10]",
+ "customers_even_dist [10] [18]",
+ "customers_even_dist [18] null");
+ assertEquals(
+ expected,
+ getTestAssignSnapshotSplitsFromCheckpoint(
+
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED));
}
private List<String> getTestAssignSnapshotSplits(
@@ -536,7 +553,7 @@ public class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
return getSplitsFromAssigner(assigner);
}
- private List<String> getTestAssignSnapshotSplitsFromCheckpoint() {
+ private List<String>
getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus assignerStatus) {
TableId newTable =
TableId.parse(customerDatabase.getDatabaseName() +
".customer_card_single_line");
TableId processedTable =
@@ -619,7 +636,7 @@ public class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
assignedSplits,
new HashMap<>(),
splitFinishedOffsets,
- AssignerStatus.INITIAL_ASSIGNING,
+ assignerStatus,
remainingTables,
false,
true,