This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf5a395a [FLINK-34688][cdc-connector][mysql] Make scan newly table 
trigger condition strictly
4bf5a395a is described below

commit 4bf5a395a5a8f83a5e309e5e985ad7c839b953db
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Aug 9 15:38:58 2024 +0800

    [FLINK-34688][cdc-connector][mysql] Make scan newly table trigger condition 
strictly
    
    This closes #3519.
---
 .../source/assigner/SnapshotSplitAssigner.java     |  3 ++-
 .../assigners/MySqlSnapshotSplitAssigner.java      |  3 ++-
 .../assigners/MySqlSnapshotSplitAssignerTest.java  | 25 ++++++++++++++++++----
 3 files changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index cd0e77200..a3a234b73 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -203,7 +203,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
 
     private void captureNewlyAddedTables() {
         if (sourceConfig.isScanNewlyAddedTableEnabled()
-                && !sourceConfig.getStartupOptions().isSnapshotOnly()) {
+                && !sourceConfig.getStartupOptions().isSnapshotOnly()
+                && AssignerStatus.isAssigningFinished(assignerStatus)) {
             try {
                 // check whether we got newly added tables
                 final List<TableId> currentCapturedTables =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index e209921b5..9ea69b11a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -215,7 +215,8 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
     private void captureNewlyAddedTables() {
         // Don't scan newly added table in snapshot mode.
         if (sourceConfig.isScanNewlyAddedTableEnabled()
-                && !sourceConfig.getStartupOptions().isSnapshotOnly()) {
+                && !sourceConfig.getStartupOptions().isSnapshotOnly()
+                && AssignerStatus.isAssigningFinished(assignerStatus)) {
             // check whether we got newly added tables
             try (JdbcConnection jdbc = 
DebeziumUtils.openJdbcConnection(sourceConfig)) {
                 final List<TableId> currentCapturedTables =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 759827d9d..de875a0ed 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -484,7 +484,7 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
     }
 
     @Test
-    public void testScanNewlyAddedTableStartFromCheckpoint() {
+    public void 
testScanNewlyAddedTableStartFromInitialAssigningFinishedCheckpoint() {
         List<String> expected =
                 Arrays.asList(
                         "customers_sparse_dist [109] null",
@@ -492,7 +492,24 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         "customers_even_dist [10] [18]",
                         "customers_even_dist [18] null",
                         "customer_card_single_line null null");
-        assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint());
+        assertEquals(
+                expected,
+                getTestAssignSnapshotSplitsFromCheckpoint(
+                        AssignerStatus.INITIAL_ASSIGNING_FINISHED));
+    }
+
+    @Test
+    public void 
testScanNewlyAddedTableStartFromNewlyAddedAssigningSnapshotFinishedCheckpoint() 
{
+        List<String> expected =
+                Arrays.asList(
+                        "customers_sparse_dist [109] null",
+                        "customers_even_dist null [10]",
+                        "customers_even_dist [10] [18]",
+                        "customers_even_dist [18] null");
+        assertEquals(
+                expected,
+                getTestAssignSnapshotSplitsFromCheckpoint(
+                        
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED));
     }
 
     private List<String> getTestAssignSnapshotSplits(
@@ -536,7 +553,7 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
         return getSplitsFromAssigner(assigner);
     }
 
-    private List<String> getTestAssignSnapshotSplitsFromCheckpoint() {
+    private List<String> 
getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus assignerStatus) {
         TableId newTable =
                 TableId.parse(customerDatabase.getDatabaseName() + 
".customer_card_single_line");
         TableId processedTable =
@@ -619,7 +636,7 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         assignedSplits,
                         new HashMap<>(),
                         splitFinishedOffsets,
-                        AssignerStatus.INITIAL_ASSIGNING,
+                        assignerStatus,
                         remainingTables,
                         false,
                         true,

Reply via email to