This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bab36707 [FLINK-38618][base] Fix offset error due to duplicated 
stream split during  TM failover if startup mode is latest-offset
4bab36707 is described below

commit 4bab36707b781d2221d4d5c6b515b074b72789ec
Author: Xin Gong <[email protected]>
AuthorDate: Tue Nov 18 09:53:31 2025 +0800

    [FLINK-38618][base] Fix offset error due to duplicated stream split during  
TM failover if startup mode is latest-offset
    
    This closes  #4169.
---
 .../enumerator/IncrementalSourceEnumerator.java    |   5 +-
 .../source/assigners/MySqlBinlogSplitAssigner.java |   7 +-
 .../source/enumerator/MySqlSourceEnumerator.java   |   5 +-
 .../sqlserver/source/SqlServerSourceITCase.java    | 137 +++++++++++++++++++--
 4 files changed, 140 insertions(+), 14 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
index acefcf085..a9a8c34db 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
@@ -126,7 +127,9 @@ public class IncrementalSourceEnumerator
             LOG.info("The enumerator adds add stream split back: {}", 
streamSplit);
             this.streamSplitTaskId = null;
         }
-        splitAssigner.addSplits(splits);
+        if (!CollectionUtil.isNullOrEmpty(splits)) {
+            splitAssigner.addSplits(splits);
+        }
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
index aa35e47a6..b8d1f54fc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
-import org.apache.flink.util.CollectionUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -90,10 +89,8 @@ public class MySqlBinlogSplitAssigner implements 
MySqlSplitAssigner {
 
     @Override
     public void addSplits(Collection<MySqlSplit> splits) {
-        if (!CollectionUtil.isNullOrEmpty(splits)) {
-            // we don't store the split, but will re-create binlog split later
-            isBinlogSplitAssigned = false;
-        }
+        // we don't store the split, but will re-create binlog split later
+        isBinlogSplitAssigned = false;
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
index bd2c88864..0451db76a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
@@ -126,7 +127,9 @@ public class MySqlSourceEnumerator implements 
SplitEnumerator<MySqlSplit, Pendin
             LOG.info("The enumerator adds add binlog split back: {}", 
binlogSplit);
             this.binlogSplitTaskId = null;
         }
-        splitAssigner.addSplits(splits);
+        if (!CollectionUtil.isNullOrEmpty(splits)) {
+            splitAssigner.addSplits(splits);
+        }
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
index d92ce7cc9..099878e83 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.CloseableIterator;
 
 import io.debezium.jdbc.JdbcConnection;
 import org.apache.commons.lang3.StringUtils;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -48,11 +49,17 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.catalog.Column.physical;
@@ -67,6 +74,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase {
 
     private static final int USE_POST_LOWWATERMARK_HOOK = 1;
     private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
+    private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
 
     @Test
     void testReadSingleTableWithSingleParallelism() throws Exception {
@@ -111,6 +119,26 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
                 1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] 
{"dbo.customers"});
     }
 
+    @Test
+    public void testJobManagerFailoverFromLatestOffset() throws Exception {
+        testSqlServerParallelSource(
+                DEFAULT_PARALLELISM,
+                "latest-offset",
+                FailoverType.JM,
+                FailoverPhase.STREAM,
+                new String[] {"dbo.customers"});
+    }
+
+    @Test
+    public void testTaskManagerFailoverFromLatestOffset() throws Exception {
+        testSqlServerParallelSource(
+                DEFAULT_PARALLELISM,
+                "latest-offset",
+                FailoverType.TM,
+                FailoverPhase.STREAM,
+                new String[] {"dbo.customers"});
+    }
+
     @Test
     void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws 
Exception {
         testSqlServerParallelSource(
@@ -375,12 +403,14 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
 
     private void testSqlServerParallelSource(
             int parallelism,
+            String scanStartupMode,
             FailoverType failoverType,
             FailoverPhase failoverPhase,
             String[] captureCustomerTables)
             throws Exception {
         testSqlServerParallelSource(
                 parallelism,
+                scanStartupMode,
                 failoverType,
                 failoverPhase,
                 captureCustomerTables,
@@ -393,6 +423,43 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
             int parallelism,
             FailoverType failoverType,
             FailoverPhase failoverPhase,
+            String[] captureCustomerTables)
+            throws Exception {
+        testSqlServerParallelSource(
+                parallelism,
+                failoverType,
+                failoverPhase,
+                captureCustomerTables,
+                false,
+                RestartStrategies.fixedDelayRestart(1, 0),
+                null);
+    }
+
+    private void testSqlServerParallelSource(
+            int parallelism,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
+            String[] captureCustomerTables,
+            boolean skipSnapshotBackfill,
+            RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration,
+            String chunkColumn)
+            throws Exception {
+        testSqlServerParallelSource(
+                parallelism,
+                DEFAULT_SCAN_STARTUP_MODE,
+                failoverType,
+                failoverPhase,
+                captureCustomerTables,
+                skipSnapshotBackfill,
+                restartStrategyConfiguration,
+                chunkColumn);
+    }
+
+    private void testSqlServerParallelSource(
+            int parallelism,
+            String scanStartupMode,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
             String[] captureCustomerTables,
             boolean skipSnapshotBackfill,
             RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration,
@@ -418,6 +485,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase 
{
                                 + " phone_number STRING,"
                                 + " primary key (id) not enforced"
                                 + ") WITH ("
+                                + " 'scan.startup.mode' = '%s',"
                                 + " 'connector' = 'sqlserver-cdc',"
                                 + " 'hostname' = '%s',"
                                 + " 'port' = '%s',"
@@ -430,6 +498,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase 
{
                                 + " 'scan.incremental.snapshot.backfill.skip' 
= '%s'"
                                 + "%s"
                                 + ")",
+                        scanStartupMode,
                         MSSQL_SERVER_CONTAINER.getHost(),
                         
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
                         MSSQL_SERVER_CONTAINER.getUsername(),
@@ -442,8 +511,26 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
                                 : 
",'scan.incremental.snapshot.chunk.key-column'='"
                                         + chunkColumn
                                         + "'");
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("select * from customers");
 
         // first step: check the snapshot data
+        if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
+            checkSnapshotData(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
+        }
+
+        // second step: check the binlog data
+        checkBinlogData(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
+
+        tableResult.getJobClient().get().cancel().get();
+    }
+
+    private void checkSnapshotData(
+            TableResult tableResult,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
+            String[] captureCustomerTables)
+            throws Exception {
         String[] snapshotForSingleTable =
                 new String[] {
                     "+I[101, user_1, Shanghai, 123567891234]",
@@ -468,15 +555,15 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
                     "+I[1019, user_20, Shanghai, 123567891234]",
                     "+I[2000, user_21, Shanghai, 123567891234]"
                 };
-        tEnv.executeSql(sourceDDL);
-        TableResult tableResult = tEnv.executeSql("select * from customers");
-        CloseableIterator<Row> iterator = tableResult.collect();
-        JobID jobId = tableResult.getJobClient().get().getJobID();
+
         List<String> expectedSnapshotData = new ArrayList<>();
         for (int i = 0; i < captureCustomerTables.length; i++) {
             expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
         }
 
+        CloseableIterator<Row> iterator = tableResult.collect();
+        JobID jobId = tableResult.getJobClient().get().getJobID();
+
         // trigger failover after some snapshot splits read finished
         if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
             triggerFailover(
@@ -486,20 +573,35 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
                     () -> sleepMs(100));
         }
 
-        LOG.info("snapshot data start");
         assertEqualsInAnyOrder(
                 expectedSnapshotData, fetchRows(iterator, 
expectedSnapshotData.size()));
+    }
+
+    private void checkBinlogData(
+            TableResult tableResult,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
+            String[] captureCustomerTables)
+            throws Exception {
+        String databaseName = "customer";
+        waitUntilJobRunning(tableResult);
+        CloseableIterator<Row> iterator = tableResult.collect();
+        JobID jobId = tableResult.getJobClient().get().getJobID();
 
-        // second step: check the change stream data
         for (String tableId : captureCustomerTables) {
             makeFirstPartChangeStreamEvents(databaseName + "." + tableId);
         }
+
+        // wait for the binlog reading
+        Thread.sleep(2000L);
+
         if (failoverPhase == FailoverPhase.STREAM) {
             triggerFailover(
                     failoverType,
                     jobId,
                     miniClusterResource.get().getMiniCluster(),
                     () -> sleepMs(200));
+            waitUntilJobRunning(tableResult);
         }
         for (String tableId : captureCustomerTables) {
             makeSecondPartBinlogEvents(databaseName + "." + tableId);
@@ -524,7 +626,28 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
             expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
         }
         assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, 
expectedBinlogData.size()));
-        tableResult.getJobClient().get().cancel().get();
+        Assertions.assertThat(hasNextData(iterator)).isFalse();
+    }
+
+    private void waitUntilJobRunning(TableResult tableResult)
+            throws InterruptedException, ExecutionException {
+        do {
+            Thread.sleep(5000L);
+        } while (tableResult.getJobClient().get().getJobStatus().get() != 
RUNNING);
+    }
+
+    private boolean hasNextData(final CloseableIterator<?> iterator)
+            throws InterruptedException, ExecutionException {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            FutureTask<Boolean> future = new FutureTask(iterator::hasNext);
+            executor.execute(future);
+            return future.get(3, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            return false;
+        } finally {
+            executor.shutdown();
+        }
     }
 
     private void makeFirstPartChangeStreamEvents(String tableId) {

Reply via email to