This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c6a0b6e11 [FLINK-38265] Stream Split shouldn't finish when exception 
occors but met END Watermark. (#4101)
c6a0b6e11 is described below

commit c6a0b6e110fb096e01fe158fa35b7dbad09e1b22
Author: Hongshun Wang <125648852+loserwang1...@users.noreply.github.com>
AuthorDate: Tue Sep 16 19:47:56 2025 +0800

    [FLINK-38265] Stream Split shouldn't finish when exception occors but met 
END Watermark. (#4101)
---
 .../external/IncrementalSourceStreamFetcher.java   |  19 +--
 .../mysql/debezium/reader/BinlogSplitReader.java   |  18 ++-
 .../debezium/reader/BinlogSplitReaderTest.java     |  32 +++++
 .../source/fetch/PostgresStreamFetchTask.java      |  11 +-
 .../fetch/IncrementalSourceStreamFetcherTest.java  | 138 +++++++++++++++++++++
 5 files changed, 207 insertions(+), 11 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 61c2b9ffb..754c8aa1f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -47,6 +47,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;
+
 /** Fetcher to fetch data from table split, the split is the stream split 
{@link StreamSplit}. */
 public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, 
SourceSplitBase> {
     private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalSourceStreamFetcher.class);
@@ -96,12 +98,6 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
                                         currentStreamSplit),
                                 e);
                         readException = e;
-                    } finally {
-                        try {
-                            stopReadTask();
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
                     }
                 });
     }
@@ -116,10 +112,19 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     public Iterator<SourceRecords> pollSplitRecords() throws 
InterruptedException {
         checkReadException();
         final List<SourceRecord> sourceRecords = new ArrayList<>();
+        // what happens if currentTaskRunning
         if (currentTaskRunning) {
             List<DataChangeEvent> batch = queue.poll();
             for (DataChangeEvent event : batch) {
-                if (shouldEmit(event.getRecord())) {
+                if (isEndWatermarkEvent(event.getRecord())) {
+                    LOG.info("Read split {} end watermark event", 
currentStreamSplit);
+                    try {
+                        stopReadTask();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                    break;
+                } else if (shouldEmit(event.getRecord())) {
                     sourceRecords.add(event.getRecord());
                 } else {
                     LOG.debug("{} data change event should not emit", event);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index 87a435ff6..dfc639e84 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -64,6 +64,7 @@ import java.util.function.Predicate;
 
 import static 
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
 import static 
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
+import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isEndWatermarkEvent;
 
 /**
  * A Debezium binlog reader implementation that also support reads binlog and 
filter overlapping
@@ -148,8 +149,6 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
                                         currentBinlogSplit),
                                 t);
                         readException = t;
-                    } finally {
-                        stopBinlogReadTask();
                     }
                 });
     }
@@ -167,6 +166,16 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         if (currentTaskRunning) {
             List<DataChangeEvent> batch = queue.poll();
             for (DataChangeEvent event : batch) {
+                if (isEndWatermarkEvent(event.getRecord())) {
+                    LOG.info("Read split {} end watermark event", 
currentBinlogSplit);
+                    try {
+                        stopBinlogReadTask();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                    break;
+                }
+
                 if (isParsingOnLineSchemaChanges) {
                     Optional<SourceRecord> oscRecord =
                             parseOnLineSchemaChangeEvent(event.getRecord());
@@ -398,4 +407,9 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
     MySqlBinlogSplitReadTask getBinlogSplitReadTask() {
         return binlogSplitReadTask;
     }
+
+    @VisibleForTesting
+    public StoppableChangeEventSourceContext getChangeEventSourceContext() {
+        return changeEventSourceContext;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index e9205d2c5..0a6417b3e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -90,6 +90,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.get
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader}. */
 class BinlogSplitReaderTest extends MySqlSourceTestBase {
@@ -1119,6 +1120,37 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase {
         Assertions.assertThat(sourceRecords).isEmpty();
     }
 
+    @Test
+    void testReadBinlogWithException() throws Exception {
+        customerDatabase.createAndInitialize();
+        MySqlSourceConfig sourceConfig =
+                getConfig(StartupOptions.latest(), new String[] {"customers"});
+        binaryLogClient = 
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
+        mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
+
+        // Create reader and submit splits
+        StatefulTaskContext statefulTaskContext =
+                new StatefulTaskContext(sourceConfig, binaryLogClient, 
mySqlConnection);
+        MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
+        BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext, 
0);
+
+        // Mock an exception occurring during stream split reading by setting 
the error handler
+        // and stopping the change event source to test exception handling
+        reader.submitSplit(split);
+        statefulTaskContext
+                .getErrorHandler()
+                .setProducerThrowable(new RuntimeException("Test read with 
exception"));
+        reader.getChangeEventSourceContext().stopChangeEventSource();
+        // wait until executor is finished.
+        Thread.sleep(500L);
+
+        assertThatThrownBy(() -> pollRecordsFromReader(reader, 
RecordUtils::isDataChangeRecord))
+                .rootCause()
+                .isExactlyInstanceOf(RuntimeException.class)
+                .hasMessage("Test read with exception");
+        reader.close();
+    }
+
     private BinlogSplitReader createBinlogReader(MySqlSourceConfig 
sourceConfig) {
         return createBinlogReader(sourceConfig, false);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java
index b34db92b1..a3d28a12c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.connectors.postgres.source.fetch;
 
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
@@ -54,6 +55,7 @@ public class PostgresStreamFetchTask implements 
FetchTask<SourceSplitBase> {
     private static final Logger LOG = 
LoggerFactory.getLogger(PostgresStreamFetchTask.class);
 
     private final StreamSplit split;
+    private final StoppableChangeEventSourceContext changeEventSourceContext;
     private volatile boolean taskRunning = false;
     private volatile boolean stopped = false;
 
@@ -63,6 +65,7 @@ public class PostgresStreamFetchTask implements 
FetchTask<SourceSplitBase> {
 
     public PostgresStreamFetchTask(StreamSplit streamSplit) {
         this.split = streamSplit;
+        this.changeEventSourceContext = new 
StoppableChangeEventSourceContext();
     }
 
     @Override
@@ -92,8 +95,7 @@ public class PostgresStreamFetchTask implements 
FetchTask<SourceSplitBase> {
                         sourceFetchContext.getTaskContext(),
                         sourceFetchContext.getReplicationConnection(),
                         split);
-        StoppableChangeEventSourceContext changeEventSourceContext =
-                new StoppableChangeEventSourceContext();
+
         streamSplitReadTask.execute(
                 changeEventSourceContext,
                 sourceFetchContext.getPartition(),
@@ -162,6 +164,11 @@ public class PostgresStreamFetchTask implements 
FetchTask<SourceSplitBase> {
         }
     }
 
+    @VisibleForTesting
+    StoppableChangeEventSourceContext getChangeEventSourceContext() {
+        return changeEventSourceContext;
+    }
+
     /** A {@link ChangeEventSource} implementation for Postgres to read 
streaming changes. */
     public static class StreamSplitReadTask extends 
PostgresStreamingChangeEventSource {
         private static final Logger LOG = 
LoggerFactory.getLogger(StreamSplitReadTask.class);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
new file mode 100644
index 000000000..fa0fe8773
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.fetch;
+
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import 
org.apache.flink.cdc.connectors.base.source.assigner.StreamSplitAssigner;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link IncrementalSourceStreamFetcher }. */
+public class IncrementalSourceStreamFetcherTest extends PostgresTestBase {
+
+    private static final String schemaName = "customer";
+    private static final String tableName = "Customers";
+
+    private final UniqueDatabase customDatabase =
+            new UniqueDatabase(
+                    POSTGRES_CONTAINER,
+                    "postgres",
+                    "customer",
+                    POSTGRES_CONTAINER.getUsername(),
+                    POSTGRES_CONTAINER.getPassword());
+
+    @Test
+    void testReadStreamSplitWithException() throws Exception {
+        customDatabase.createAndInitialize();
+        PostgresSourceConfigFactory sourceConfigFactory =
+                getMockPostgresSourceConfigFactory(customDatabase, schemaName, 
tableName, 10, true);
+        sourceConfigFactory.startupOptions(StartupOptions.latest());
+        PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
+        PostgresDialect dialect = new 
PostgresDialect(sourceConfigFactory.create(0));
+
+        // Create reader and submit splits
+        PostgresSourceFetchTaskContext taskContext =
+                new PostgresSourceFetchTaskContext(sourceConfig, dialect);
+        IncrementalSourceStreamFetcher fetcher = new 
IncrementalSourceStreamFetcher(taskContext, 0);
+        StreamSplit split = createStreamSplit(sourceConfig, dialect);
+        PostgresStreamFetchTask fetchTask =
+                (PostgresStreamFetchTask) dialect.createFetchTask(split);
+        StoppableChangeEventSourceContext changeEventSourceContext =
+                fetchTask.getChangeEventSourceContext();
+
+        fetcher.submitTask(fetchTask);
+        // Mock an exception occurring during stream split reading by setting 
the error handler
+        // and stopping the change event source to test exception handling
+        taskContext
+                .getErrorHandler()
+                .setProducerThrowable(new RuntimeException("Test read with 
exception"));
+        changeEventSourceContext.stopChangeEventSource();
+
+        // Wait for the task to complete
+        Thread.sleep(500L);
+
+        assertThatThrownBy(
+                        () -> pollRecordsFromReader(fetcher, 
SourceRecordUtils::isDataChangeRecord))
+                .rootCause()
+                .isExactlyInstanceOf(RuntimeException.class)
+                .hasMessage("Test read with exception");
+        fetcher.close();
+    }
+
+    private StreamSplit createStreamSplit(
+            PostgresSourceConfig sourceConfig, PostgresDialect dialect) throws 
Exception {
+        StreamSplitAssigner streamSplitAssigner =
+                new StreamSplitAssigner(
+                        sourceConfig,
+                        dialect,
+                        new PostgresOffsetFactory(),
+                        new MockSplitEnumeratorContext<>(1));
+        streamSplitAssigner.open();
+
+        Map<TableId, TableChanges.TableChange> tableSchemas =
+                dialect.discoverDataCollectionSchemas(sourceConfig);
+        return 
StreamSplit.fillTableSchemas(streamSplitAssigner.createStreamSplit(), 
tableSchemas);
+    }
+
+    private List<SourceRecord> pollRecordsFromReader(
+            IncrementalSourceStreamFetcher fetcher, Predicate<SourceRecord> 
filter) {
+        List<SourceRecord> records = new ArrayList<>();
+        Iterator<SourceRecords> recordIterator;
+        try {
+            recordIterator = fetcher.pollSplitRecords();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Polling action was interrupted", e);
+        }
+        if (recordIterator == null) {
+            return records;
+        }
+        while (recordIterator.hasNext()) {
+            Iterator<SourceRecord> iterator = recordIterator.next().iterator();
+            while (iterator.hasNext()) {
+                SourceRecord record = iterator.next();
+                if (filter.test(record)) {
+                    records.add(record);
+                }
+            }
+        }
+        LOG.debug("Records polled: {}", records);
+        return records;
+    }
+}

Reply via email to