This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 4bab36707 [FLINK-38618][base] Fix offset error due to duplicated
stream split during TM failover if startup mode is latest-offset
4bab36707 is described below
commit 4bab36707b781d2221d4d5c6b515b074b72789ec
Author: Xin Gong <[email protected]>
AuthorDate: Tue Nov 18 09:53:31 2025 +0800
[FLINK-38618][base] Fix offset error due to duplicated stream split during
TM failover if startup mode is latest-offset
This closes #4169.
---
.../enumerator/IncrementalSourceEnumerator.java | 5 +-
.../source/assigners/MySqlBinlogSplitAssigner.java | 7 +-
.../source/enumerator/MySqlSourceEnumerator.java | 5 +-
.../sqlserver/source/SqlServerSourceITCase.java | 137 +++++++++++++++++++--
4 files changed, 140 insertions(+), 14 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
index acefcf085..a9a8c34db 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -40,6 +40,7 @@ import
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
@@ -126,7 +127,9 @@ public class IncrementalSourceEnumerator
LOG.info("The enumerator adds add stream split back: {}",
streamSplit);
this.streamSplitTaskId = null;
}
- splitAssigner.addSplits(splits);
+ if (!CollectionUtil.isNullOrEmpty(splits)) {
+ splitAssigner.addSplits(splits);
+ }
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
index aa35e47a6..b8d1f54fc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
@@ -25,7 +25,6 @@ import
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
-import org.apache.flink.util.CollectionUtil;
import java.io.IOException;
import java.util.ArrayList;
@@ -90,10 +89,8 @@ public class MySqlBinlogSplitAssigner implements
MySqlSplitAssigner {
@Override
public void addSplits(Collection<MySqlSplit> splits) {
- if (!CollectionUtil.isNullOrEmpty(splits)) {
- // we don't store the split, but will re-create binlog split later
- isBinlogSplitAssigned = false;
- }
+ // we don't store the split, but will re-create binlog split later
+ isBinlogSplitAssigned = false;
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
index bd2c88864..0451db76a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
@@ -40,6 +40,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
@@ -126,7 +127,9 @@ public class MySqlSourceEnumerator implements
SplitEnumerator<MySqlSplit, Pendin
LOG.info("The enumerator adds add binlog split back: {}",
binlogSplit);
this.binlogSplitTaskId = null;
}
- splitAssigner.addSplits(splits);
+ if (!CollectionUtil.isNullOrEmpty(splits)) {
+ splitAssigner.addSplits(splits);
+ }
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
index d92ce7cc9..099878e83 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.CloseableIterator;
import io.debezium.jdbc.JdbcConnection;
import org.apache.commons.lang3.StringUtils;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -48,11 +49,17 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
@@ -67,6 +74,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
+ private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
@Test
void testReadSingleTableWithSingleParallelism() throws Exception {
@@ -111,6 +119,26 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]
{"dbo.customers"});
}
+ @Test
+ public void testJobManagerFailoverFromLatestOffset() throws Exception {
+ testSqlServerParallelSource(
+ DEFAULT_PARALLELISM,
+ "latest-offset",
+ FailoverType.JM,
+ FailoverPhase.STREAM,
+ new String[] {"dbo.customers"});
+ }
+
+ @Test
+ public void testTaskManagerFailoverFromLatestOffset() throws Exception {
+ testSqlServerParallelSource(
+ DEFAULT_PARALLELISM,
+ "latest-offset",
+ FailoverType.TM,
+ FailoverPhase.STREAM,
+ new String[] {"dbo.customers"});
+ }
+
@Test
void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws
Exception {
testSqlServerParallelSource(
@@ -375,12 +403,14 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
private void testSqlServerParallelSource(
int parallelism,
+ String scanStartupMode,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
testSqlServerParallelSource(
parallelism,
+ scanStartupMode,
failoverType,
failoverPhase,
captureCustomerTables,
@@ -393,6 +423,43 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
+ String[] captureCustomerTables)
+ throws Exception {
+ testSqlServerParallelSource(
+ parallelism,
+ failoverType,
+ failoverPhase,
+ captureCustomerTables,
+ false,
+ RestartStrategies.fixedDelayRestart(1, 0),
+ null);
+ }
+
+ private void testSqlServerParallelSource(
+ int parallelism,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables,
+ boolean skipSnapshotBackfill,
+ RestartStrategies.RestartStrategyConfiguration
restartStrategyConfiguration,
+ String chunkColumn)
+ throws Exception {
+ testSqlServerParallelSource(
+ parallelism,
+ DEFAULT_SCAN_STARTUP_MODE,
+ failoverType,
+ failoverPhase,
+ captureCustomerTables,
+ skipSnapshotBackfill,
+ restartStrategyConfiguration,
+ chunkColumn);
+ }
+
+ private void testSqlServerParallelSource(
+ int parallelism,
+ String scanStartupMode,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
String[] captureCustomerTables,
boolean skipSnapshotBackfill,
RestartStrategies.RestartStrategyConfiguration
restartStrategyConfiguration,
@@ -418,6 +485,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase
{
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ + " 'scan.startup.mode' = '%s',"
+ " 'connector' = 'sqlserver-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
@@ -430,6 +498,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase
{
+ " 'scan.incremental.snapshot.backfill.skip'
= '%s'"
+ "%s"
+ ")",
+ scanStartupMode,
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
@@ -442,8 +511,26 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
:
",'scan.incremental.snapshot.chunk.key-column'='"
+ chunkColumn
+ "'");
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from customers");
// first step: check the snapshot data
+ if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
+ checkSnapshotData(tableResult, failoverType, failoverPhase,
captureCustomerTables);
+ }
+
+ // second step: check the binlog data
+ checkBinlogData(tableResult, failoverType, failoverPhase,
captureCustomerTables);
+
+ tableResult.getJobClient().get().cancel().get();
+ }
+
+ private void checkSnapshotData(
+ TableResult tableResult,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables)
+ throws Exception {
String[] snapshotForSingleTable =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
@@ -468,15 +555,15 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
- tEnv.executeSql(sourceDDL);
- TableResult tableResult = tEnv.executeSql("select * from customers");
- CloseableIterator<Row> iterator = tableResult.collect();
- JobID jobId = tableResult.getJobClient().get().getJobID();
+
List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}
+ CloseableIterator<Row> iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+
// trigger failover after some snapshot splits read finished
if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
triggerFailover(
@@ -486,20 +573,35 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
() -> sleepMs(100));
}
- LOG.info("snapshot data start");
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator,
expectedSnapshotData.size()));
+ }
+
+ private void checkBinlogData(
+ TableResult tableResult,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables)
+ throws Exception {
+ String databaseName = "customer";
+ waitUntilJobRunning(tableResult);
+ CloseableIterator<Row> iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
- // second step: check the change stream data
for (String tableId : captureCustomerTables) {
makeFirstPartChangeStreamEvents(databaseName + "." + tableId);
}
+
+ // wait for the binlog reading
+ Thread.sleep(2000L);
+
if (failoverPhase == FailoverPhase.STREAM) {
triggerFailover(
failoverType,
jobId,
miniClusterResource.get().getMiniCluster(),
() -> sleepMs(200));
+ waitUntilJobRunning(tableResult);
}
for (String tableId : captureCustomerTables) {
makeSecondPartBinlogEvents(databaseName + "." + tableId);
@@ -524,7 +626,28 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
}
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator,
expectedBinlogData.size()));
- tableResult.getJobClient().get().cancel().get();
+ Assertions.assertThat(hasNextData(iterator)).isFalse();
+ }
+
+ private void waitUntilJobRunning(TableResult tableResult)
+ throws InterruptedException, ExecutionException {
+ do {
+ Thread.sleep(5000L);
+ } while (tableResult.getJobClient().get().getJobStatus().get() !=
RUNNING);
+ }
+
+ private boolean hasNextData(final CloseableIterator<?> iterator)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ FutureTask<Boolean> future = new FutureTask(iterator::hasNext);
+ executor.execute(future);
+ return future.get(3, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ return false;
+ } finally {
+ executor.shutdown();
+ }
}
private void makeFirstPartChangeStreamEvents(String tableId) {