This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new c6a0b6e11 [FLINK-38265] Stream Split shouldn't finish when exception occors but met END Watermark. (#4101) c6a0b6e11 is described below commit c6a0b6e110fb096e01fe158fa35b7dbad09e1b22 Author: Hongshun Wang <125648852+loserwang1...@users.noreply.github.com> AuthorDate: Tue Sep 16 19:47:56 2025 +0800 [FLINK-38265] Stream Split shouldn't finish when exception occors but met END Watermark. (#4101) --- .../external/IncrementalSourceStreamFetcher.java | 19 +-- .../mysql/debezium/reader/BinlogSplitReader.java | 18 ++- .../debezium/reader/BinlogSplitReaderTest.java | 32 +++++ .../source/fetch/PostgresStreamFetchTask.java | 11 +- .../fetch/IncrementalSourceStreamFetcherTest.java | 138 +++++++++++++++++++++ 5 files changed, 207 insertions(+), 11 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java index 61c2b9ffb..754c8aa1f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -47,6 +47,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent; + /** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> { private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceStreamFetcher.class); @@ -96,12 +98,6 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So currentStreamSplit), e); readException = e; - } finally { - try { - stopReadTask(); - } catch (Exception e) { - throw new RuntimeException(e); - } } }); } @@ -116,10 +112,19 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException { checkReadException(); final List<SourceRecord> sourceRecords = new ArrayList<>(); + // what happens if currentTaskRunning if (currentTaskRunning) { List<DataChangeEvent> batch = queue.poll(); for (DataChangeEvent event : batch) { - if (shouldEmit(event.getRecord())) { + if (isEndWatermarkEvent(event.getRecord())) { + LOG.info("Read split {} end watermark event", currentStreamSplit); + try { + stopReadTask(); + } catch (Exception e) { + throw new RuntimeException(e); + } + break; + } else if (shouldEmit(event.getRecord())) { sourceRecords.add(event.getRecord()); } else { LOG.debug("{} data change event should not emit", event); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 87a435ff6..dfc639e84 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -64,6 +64,7 @@ import java.util.function.Predicate; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; +import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isEndWatermarkEvent; /** * A Debezium binlog reader implementation that also support reads binlog and filter overlapping @@ -148,8 +149,6 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl currentBinlogSplit), t); readException = t; - } finally { - stopBinlogReadTask(); } }); } @@ -167,6 +166,16 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl if (currentTaskRunning) { List<DataChangeEvent> batch = queue.poll(); for (DataChangeEvent event : batch) { + if (isEndWatermarkEvent(event.getRecord())) { + LOG.info("Read split {} end watermark event", currentBinlogSplit); + try { + stopBinlogReadTask(); + } catch (Exception e) { + throw new RuntimeException(e); + } + break; + } + if (isParsingOnLineSchemaChanges) { Optional<SourceRecord> oscRecord = parseOnLineSchemaChangeEvent(event.getRecord()); @@ -398,4 +407,9 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl MySqlBinlogSplitReadTask getBinlogSplitReadTask() { return binlogSplitReadTask; } + + @VisibleForTesting + public StoppableChangeEventSourceContext getChangeEventSourceContext() { + return changeEventSourceContext; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index e9205d2c5..0a6417b3e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -90,6 +90,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.get import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader}. */ class BinlogSplitReaderTest extends MySqlSourceTestBase { @@ -1119,6 +1120,37 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase { Assertions.assertThat(sourceRecords).isEmpty(); } + @Test + void testReadBinlogWithException() throws Exception { + customerDatabase.createAndInitialize(); + MySqlSourceConfig sourceConfig = + getConfig(StartupOptions.latest(), new String[] {"customers"}); + binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + + // Create reader and submit splits + StatefulTaskContext statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection); + MySqlBinlogSplit split = createBinlogSplit(sourceConfig); + BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext, 0); + + // Mock an exception occurring during stream split reading by setting the error handler + // and stopping the change event source to test exception handling + reader.submitSplit(split); + statefulTaskContext + .getErrorHandler() + .setProducerThrowable(new RuntimeException("Test read with exception")); + reader.getChangeEventSourceContext().stopChangeEventSource(); + // wait until executor is finished. + Thread.sleep(500L); + + assertThatThrownBy(() -> pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord)) + .rootCause() + .isExactlyInstanceOf(RuntimeException.class) + .hasMessage("Test read with exception"); + reader.close(); + } + private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) { return createBinlogReader(sourceConfig, false); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java index b34db92b1..a3d28a12c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.postgres.source.fetch; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; @@ -54,6 +55,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> { private static final Logger LOG = LoggerFactory.getLogger(PostgresStreamFetchTask.class); private final StreamSplit split; + private final StoppableChangeEventSourceContext changeEventSourceContext; private volatile boolean taskRunning = false; private volatile boolean stopped = false; @@ -63,6 +65,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> { public PostgresStreamFetchTask(StreamSplit streamSplit) { this.split = streamSplit; + this.changeEventSourceContext = new StoppableChangeEventSourceContext(); } @Override @@ -92,8 +95,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> { sourceFetchContext.getTaskContext(), sourceFetchContext.getReplicationConnection(), split); - StoppableChangeEventSourceContext changeEventSourceContext = - new StoppableChangeEventSourceContext(); + streamSplitReadTask.execute( changeEventSourceContext, sourceFetchContext.getPartition(), @@ -162,6 +164,11 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> { } } + @VisibleForTesting + StoppableChangeEventSourceContext getChangeEventSourceContext() { + return changeEventSourceContext; + } + /** A {@link ChangeEventSource} implementation for Postgres to read streaming changes. */ public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource { private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java new file mode 100644 index 000000000..fa0fe8773 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.fetch; + +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.assigner.StreamSplitAssigner; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords; +import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; +import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher; +import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils; +import org.apache.flink.cdc.connectors.postgres.PostgresTestBase; +import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; +import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase; + +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link IncrementalSourceStreamFetcher }. */ +public class IncrementalSourceStreamFetcherTest extends PostgresTestBase { + + private static final String schemaName = "customer"; + private static final String tableName = "Customers"; + + private final UniqueDatabase customDatabase = + new UniqueDatabase( + POSTGRES_CONTAINER, + "postgres", + "customer", + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + + @Test + void testReadStreamSplitWithException() throws Exception { + customDatabase.createAndInitialize(); + PostgresSourceConfigFactory sourceConfigFactory = + getMockPostgresSourceConfigFactory(customDatabase, schemaName, tableName, 10, true); + sourceConfigFactory.startupOptions(StartupOptions.latest()); + PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0); + PostgresDialect dialect = new PostgresDialect(sourceConfigFactory.create(0)); + + // Create reader and submit splits + PostgresSourceFetchTaskContext taskContext = + new PostgresSourceFetchTaskContext(sourceConfig, dialect); + IncrementalSourceStreamFetcher fetcher = new IncrementalSourceStreamFetcher(taskContext, 0); + StreamSplit split = createStreamSplit(sourceConfig, dialect); + PostgresStreamFetchTask fetchTask = + (PostgresStreamFetchTask) dialect.createFetchTask(split); + StoppableChangeEventSourceContext changeEventSourceContext = + fetchTask.getChangeEventSourceContext(); + + fetcher.submitTask(fetchTask); + // Mock an exception occurring during stream split reading by setting the error handler + // and stopping the change event source to test exception handling + taskContext + .getErrorHandler() + .setProducerThrowable(new RuntimeException("Test read with exception")); + changeEventSourceContext.stopChangeEventSource(); + + // Wait for the task to complete + Thread.sleep(500L); + + assertThatThrownBy( + () -> pollRecordsFromReader(fetcher, SourceRecordUtils::isDataChangeRecord)) + .rootCause() + .isExactlyInstanceOf(RuntimeException.class) + .hasMessage("Test read with exception"); + fetcher.close(); + } + + private StreamSplit createStreamSplit( + PostgresSourceConfig sourceConfig, PostgresDialect dialect) throws Exception { + StreamSplitAssigner streamSplitAssigner = + new StreamSplitAssigner( + sourceConfig, + dialect, + new PostgresOffsetFactory(), + new MockSplitEnumeratorContext<>(1)); + streamSplitAssigner.open(); + + Map<TableId, TableChanges.TableChange> tableSchemas = + dialect.discoverDataCollectionSchemas(sourceConfig); + return StreamSplit.fillTableSchemas(streamSplitAssigner.createStreamSplit(), tableSchemas); + } + + private List<SourceRecord> pollRecordsFromReader( + IncrementalSourceStreamFetcher fetcher, Predicate<SourceRecord> filter) { + List<SourceRecord> records = new ArrayList<>(); + Iterator<SourceRecords> recordIterator; + try { + recordIterator = fetcher.pollSplitRecords(); + } catch (InterruptedException e) { + throw new RuntimeException("Polling action was interrupted", e); + } + if (recordIterator == null) { + return records; + } + while (recordIterator.hasNext()) { + Iterator<SourceRecord> iterator = recordIterator.next().iterator(); + while (iterator.hasNext()) { + SourceRecord record = iterator.next(); + if (filter.test(record)) { + records.add(record); + } + } + } + LOG.debug("Records polled: {}", records); + return records; + } +}