This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c9a273212 [FLINK-37509][base&mysql] Increment scan source no need to 
buffer data when enable skipping backfill. (#3964)
c9a273212 is described below

commit c9a273212ab04d7ce069c4b8eef8738b4c0079ca
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Apr 25 17:37:45 2025 +0800

    [FLINK-37509][base&mysql] Increment scan source no need to buffer data when 
enable skipping backfill. (#3964)
---
 .../reader/IncrementalSourceSplitReader.java       |  45 ++--
 .../external/IncrementalSourceScanFetcher.java     | 124 ++++++----
 .../mongodb/source/MongoDBFullChangelogITCase.java |   9 +
 .../mongodb/source/NewlyAddedTableITCase.java      |   2 +
 .../mysql/debezium/reader/SnapshotSplitReader.java | 154 +++++++-----
 .../mysql/source/reader/MySqlSplitReader.java      |  46 ++--
 .../mysql/source/split/MySqlRecords.java           |   7 +-
 .../connectors/mysql/source/MySqlSourceITCase.java | 262 +++++++--------------
 .../mysql/source/NewlyAddedTableITCase.java        |   2 +
 .../mysql/source/reader/MySqlSourceReaderTest.java |  44 +++-
 .../mysql/table/MySqlConnectorITCase.java          |  15 +-
 .../oracle/source/NewlyAddedTableITCase.java       |   2 +
 .../source/fetch/PostgresScanFetchTask.java        |   2 +-
 .../postgres/source/NewlyAddedTableITCase.java     |   2 +
 .../postgres/source/PostgresSourceITCase.java      |  16 +-
 .../source/fetch/PostgresScanFetchTaskTest.java    |  51 +++-
 .../source/reader/PostgresSourceReaderTest.java    | 222 +++++++++++++++--
 17 files changed, 625 insertions(+), 380 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
index ead3d01c0..84d687005 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
@@ -45,6 +45,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
@@ -92,7 +93,6 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
 
     @Override
     public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
-
         try {
             suspendStreamReaderIfNeed();
             return pollSplitRecords();
@@ -145,13 +145,13 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
         Iterator<SourceRecords> dataIt = null;
         if (currentFetcher == null) {
             // (1) Reads stream split firstly and then read snapshot split
-            if (streamSplits.size() > 0) {
+            if (!streamSplits.isEmpty()) {
                 // the stream split may come from:
                 // (a) the initial stream split
                 // (b) added back stream-split in newly added table process
                 StreamSplit nextSplit = streamSplits.poll();
                 submitStreamSplit(nextSplit);
-            } else if (snapshotSplits.size() > 0) {
+            } else if (!snapshotSplits.isEmpty()) {
                 submitSnapshotSplit(snapshotSplits.poll());
             } else {
                 LOG.info("No available split to read.");
@@ -162,19 +162,21 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
             } else {
                 currentSplitId = null;
             }
-            return dataIt == null ? finishedSplit() : forRecords(dataIt);
+            return dataIt == null ? finishedSplit(true) : 
forUnfinishedRecords(dataIt);
         } else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
-            // (2) try to switch to stream split reading util current snapshot 
split finished
             dataIt = currentFetcher.pollSplitRecords();
             if (dataIt != null) {
                 // first fetch data of snapshot split, return and emit the 
records of snapshot split
-                ChangeEventRecords records;
+                return forUnfinishedRecords(dataIt);
+            } else {
+                // (2) try to switch to stream split reading util current 
snapshot split finished
+                ChangeEventRecords finishedRecords;
                 if (context.isHasAssignedStreamSplit()) {
-                    records = forNewAddedTableFinishedSplit(currentSplitId, 
dataIt);
+                    finishedRecords = 
forNewAddedTableFinishedSplit(currentSplitId);
                     closeScanFetcher();
                     closeStreamFetcher();
                 } else {
-                    records = forRecords(dataIt);
+                    finishedRecords = finishedSplit(false);
                     SnapshotSplit nextSplit = snapshotSplits.poll();
                     if (nextSplit != null) {
                         checkState(reusedScanFetcher != null);
@@ -183,9 +185,7 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
                         closeScanFetcher();
                     }
                 }
-                return records;
-            } else {
-                return finishedSplit();
+                return finishedRecords;
             }
         } else if (currentFetcher instanceof IncrementalSourceStreamFetcher) {
             // (3) switch to snapshot split reading if there are newly added 
snapshot splits
@@ -203,7 +203,7 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
                 // null will be returned after receiving suspend stream event
                 // finish current stream split reading
                 closeStreamFetcher();
-                return finishedSplit();
+                return finishedSplit(true);
             }
         } else {
             throw new IllegalStateException("Unsupported reader type.");
@@ -215,9 +215,12 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
         return currentFetcher == null || currentFetcher.isFinished();
     }
 
-    private ChangeEventRecords finishedSplit() {
+    private ChangeEventRecords finishedSplit(boolean recycleScanFetcher) {
         final ChangeEventRecords finishedRecords =
                 ChangeEventRecords.forFinishedSplit(currentSplitId);
+        if (recycleScanFetcher) {
+            closeScanFetcher();
+        }
         currentSplitId = null;
         return finishedRecords;
     }
@@ -226,24 +229,16 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
      * Finishes new added snapshot split, mark the stream split as finished 
too, we will add the
      * stream split back in {@code MySqlSourceReader}.
      */
-    private ChangeEventRecords forNewAddedTableFinishedSplit(
-            final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
+    private ChangeEventRecords forNewAddedTableFinishedSplit(final String 
splitId) {
         final Set<String> finishedSplits = new HashSet<>();
         finishedSplits.add(splitId);
         finishedSplits.add(STREAM_SPLIT_ID);
         currentSplitId = null;
-        return new ChangeEventRecords(splitId, recordsForSplit, 
finishedSplits);
+        return new ChangeEventRecords(splitId, Collections.emptyIterator(), 
finishedSplits);
     }
 
-    private ChangeEventRecords forRecords(Iterator<SourceRecords> dataIt) {
-        if (currentFetcher instanceof IncrementalSourceScanFetcher) {
-            final ChangeEventRecords finishedRecords =
-                    ChangeEventRecords.forSnapshotRecords(currentSplitId, 
dataIt);
-            closeScanFetcher();
-            return finishedRecords;
-        } else {
-            return ChangeEventRecords.forRecords(currentSplitId, dataIt);
-        }
+    private ChangeEventRecords forUnfinishedRecords(Iterator<SourceRecords> 
dataIt) {
+        return ChangeEventRecords.forRecords(currentSplitId, dataIt);
     }
 
     private void submitSnapshotSplit(SnapshotSplit snapshotSplit) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
index d23d4d36c..3136bb97d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -115,64 +116,87 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
         checkReadException();
 
         if (hasNextElement.get()) {
-            // eg:
-            // data input: [low watermark event][snapshot events][high 
watermark event][change
-            // events][end watermark event]
-            // data output: [low watermark event][normalized events][high 
watermark event]
-            boolean reachChangeLogStart = false;
-            boolean reachChangeLogEnd = false;
-            SourceRecord lowWatermark = null;
-            SourceRecord highWatermark = null;
-            Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
-            while (!reachChangeLogEnd) {
-                checkReadException();
-                List<DataChangeEvent> batch = queue.poll();
-                for (DataChangeEvent event : batch) {
-                    SourceRecord record = event.getRecord();
-                    if (lowWatermark == null) {
-                        lowWatermark = record;
-                        assertLowWatermark(lowWatermark);
-                        continue;
-                    }
+            if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+                return pollWithoutBuffer();
+            } else {
+                return pollWithBuffer();
+            }
+        }
+        // the data has been polled, no more data
+        reachEnd.compareAndSet(false, true);
+        return null;
+    }
 
-                    if (highWatermark == null && isHighWatermarkEvent(record)) 
{
-                        highWatermark = record;
-                        // snapshot events capture end and begin to capture 
stream events
-                        reachChangeLogStart = true;
-                        continue;
-                    }
+    public Iterator<SourceRecords> pollWithoutBuffer() throws 
InterruptedException {
+        checkReadException();
+        List<DataChangeEvent> batch = queue.poll();
+        final List<SourceRecord> records = new ArrayList<>();
+        for (DataChangeEvent event : batch) {
+            if (isEndWatermarkEvent(event.getRecord())) {
+                hasNextElement.set(false);
+                break;
+            }
+            records.add(event.getRecord());
+        }
 
-                    if (reachChangeLogStart && isEndWatermarkEvent(record)) {
-                        // capture to end watermark events, stop the loop
-                        reachChangeLogEnd = true;
-                        break;
-                    }
+        return Collections.singletonList(new 
SourceRecords(records)).iterator();
+    }
+
+    public Iterator<SourceRecords> pollWithBuffer() throws 
InterruptedException {
+        // eg:
+        // data input: [low watermark event][snapshot events][high watermark 
event][change
+        // events][end watermark event]
+        // data output: [low watermark event][normalized events][high 
watermark event]
+        boolean reachChangeLogStart = false;
+        boolean reachChangeLogEnd = false;
+        SourceRecord lowWatermark = null;
+        SourceRecord highWatermark = null;
+        Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
+        while (!reachChangeLogEnd) {
+            checkReadException();
+            List<DataChangeEvent> batch = queue.poll();
+            for (DataChangeEvent event : batch) {
+                SourceRecord record = event.getRecord();
+                if (lowWatermark == null) {
+                    lowWatermark = record;
+                    assertLowWatermark(lowWatermark);
+                    continue;
+                }
+
+                if (highWatermark == null && isHighWatermarkEvent(record)) {
+                    highWatermark = record;
+                    // snapshot events capture end and begin to capture stream 
events
+                    reachChangeLogStart = true;
+                    continue;
+                }
+
+                if (reachChangeLogStart && isEndWatermarkEvent(record)) {
+                    // capture to end watermark events, stop the loop
+                    reachChangeLogEnd = true;
+                    break;
+                }
 
-                    if (!reachChangeLogStart) {
-                        outputBuffer.put((Struct) record.key(), record);
-                    } else {
-                        if (isChangeRecordInChunkRange(record)) {
-                            // rewrite overlapping snapshot records through 
the record key
-                            taskContext.rewriteOutputBuffer(outputBuffer, 
record);
-                        }
+                if (!reachChangeLogStart) {
+                    outputBuffer.put((Struct) record.key(), record);
+                } else {
+                    if (isChangeRecordInChunkRange(record)) {
+                        // rewrite overlapping snapshot records through the 
record key
+                        taskContext.rewriteOutputBuffer(outputBuffer, record);
                     }
                 }
             }
-            // snapshot split return its data once
-            hasNextElement.set(false);
+        }
+        // snapshot split return its data once
+        hasNextElement.set(false);
 
-            final List<SourceRecord> normalizedRecords = new ArrayList<>();
-            normalizedRecords.add(lowWatermark);
-            
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
-            normalizedRecords.add(highWatermark);
+        final List<SourceRecord> normalizedRecords = new ArrayList<>();
+        normalizedRecords.add(lowWatermark);
+        
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
+        normalizedRecords.add(highWatermark);
 
-            final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
-            sourceRecordsSet.add(new SourceRecords(normalizedRecords));
-            return sourceRecordsSet.iterator();
-        }
-        // the data has been polled, no more data
-        reachEnd.compareAndSet(false, true);
-        return null;
+        final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+        sourceRecordsSet.add(new SourceRecords(normalizedRecords));
+        return sourceRecordsSet.iterator();
     }
 
     private void checkReadException() {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
index 257f7ce1f..659c68604 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
@@ -517,6 +517,15 @@ class MongoDBFullChangelogITCase extends 
MongoDBSourceTestBase {
                     mongoCollection.updateOne(
                             Filters.eq("cid", 2000L), Updates.set("address", 
"Pittsburgh"));
                     mongoCollection.deleteOne(Filters.eq("cid", 1019L));
+
+                    // Rarely happens, but if there's no operation or 
heartbeat events between
+                    // watermark #a (the ChangeStream opLog caused by the last 
event in this hook)
+                    // and watermark #b (the calculated high watermark that 
limits the bounded
+                    // back-filling stream fetch task), the last event of hook 
will be missed since
+                    // back-filling task reads between [loW, hiW) (high 
watermark not included).
+                    // Workaround: insert a dummy event in another collection 
to forcefully push
+                    // opLog forward.
+                    database.getCollection("customers_1").insertOne(new 
Document());
                 };
 
         switch (hookType) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
index 606bc9911..81f3d2c84 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
@@ -786,6 +786,8 @@ class NewlyAddedTableITCase extends MongoDBSourceTestBase {
             waitForUpsertSinkSize("sink", fetchedDataList.size());
             MongoDBAssertUtils.assertEqualsInAnyOrder(
                     fetchedDataList, 
TestValuesTableFactory.getResultsAsStrings("sink"));
+            // Wait 1s until snapshot phase finished, make sure the binlog 
data is not lost.
+            Thread.sleep(1000L);
 
             // step 3: make some changelog data for this round
             makeFirstPartOplogForAddressCollection(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
index b86e6e527..b7b23a18e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
@@ -282,79 +282,103 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecords, MySqlS
         checkReadException();
 
         if (hasNextElement.get()) {
-            // data input: [low watermark event][snapshot events][high 
watermark event][binlog
-            // events][binlog-end event]
-            // data output: [low watermark event][normalized events][high 
watermark event]
-            boolean reachBinlogStart = false;
-            boolean reachBinlogEnd = false;
-            SourceRecord lowWatermark = null;
-            SourceRecord highWatermark = null;
-
-            Map<Struct, List<SourceRecord>> snapshotRecords = new HashMap<>();
-            while (!reachBinlogEnd) {
-                checkReadException();
-                List<DataChangeEvent> batch = queue.poll();
-                for (DataChangeEvent event : batch) {
-                    SourceRecord record = event.getRecord();
-                    if (lowWatermark == null) {
-                        lowWatermark = record;
-                        assertLowWatermark(lowWatermark);
-                        continue;
-                    }
+            if 
(statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+                return pollWithoutBuffer();
+            } else {
+                return pollWithBuffer();
+            }
+        }
 
-                    if (highWatermark == null && 
RecordUtils.isHighWatermarkEvent(record)) {
-                        highWatermark = record;
-                        // snapshot events capture end and begin to capture 
binlog events
-                        reachBinlogStart = true;
-                        continue;
-                    }
+        // the data has been polled, no more data
+        reachEnd.compareAndSet(false, true);
+        return null;
+    }
 
-                    if (reachBinlogStart && 
RecordUtils.isEndWatermarkEvent(record)) {
-                        // capture to end watermark events, stop the loop
-                        reachBinlogEnd = true;
-                        break;
-                    }
+    public Iterator<SourceRecords> pollWithoutBuffer() throws 
InterruptedException {
+        checkReadException();
+        List<DataChangeEvent> batch = queue.poll();
+        final List<SourceRecord> records = new ArrayList<>();
+        for (DataChangeEvent event : batch) {
+            if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
+                hasNextElement.set(false);
+                break;
+            }
+            records.add(event.getRecord());
+        }
 
-                    if (!reachBinlogStart) {
-                        if (record.key() != null) {
-                            snapshotRecords.put(
-                                    (Struct) record.key(), 
Collections.singletonList(record));
-                        } else {
-                            List<SourceRecord> records =
-                                    snapshotRecords.computeIfAbsent(
-                                            (Struct) record.value(), key -> 
new LinkedList<>());
-                            records.add(record);
-                        }
+        return Collections.singletonList(new 
SourceRecords(records)).iterator();
+    }
+
+    public Iterator<SourceRecords> pollWithBuffer() throws 
InterruptedException {
+        // data input: [low watermark event][snapshot events][high watermark 
event][binlog
+        // events][binlog-end event]
+        // data output: [low watermark event][normalized events][high 
watermark event]
+        boolean reachBinlogStart = false;
+        boolean reachBinlogEnd = false;
+        SourceRecord lowWatermark = null;
+        SourceRecord highWatermark = null;
+
+        Map<Struct, List<SourceRecord>> snapshotRecords = new HashMap<>();
+        while (!reachBinlogEnd) {
+            checkReadException();
+            List<DataChangeEvent> batch = queue.poll();
+            for (DataChangeEvent event : batch) {
+                SourceRecord record = event.getRecord();
+                if (lowWatermark == null) {
+                    lowWatermark = record;
+                    assertLowWatermark(lowWatermark);
+                    continue;
+                }
+
+                if (highWatermark == null && 
RecordUtils.isHighWatermarkEvent(record)) {
+                    highWatermark = record;
+                    // snapshot events capture end and begin to capture binlog 
events
+                    reachBinlogStart = true;
+                    continue;
+                }
+
+                if (reachBinlogStart && 
RecordUtils.isEndWatermarkEvent(record)) {
+                    // capture to end watermark events, stop the loop
+                    reachBinlogEnd = true;
+                    break;
+                }
+
+                if (!reachBinlogStart) {
+                    if (record.key() != null) {
+                        snapshotRecords.put(
+                                (Struct) record.key(), 
Collections.singletonList(record));
                     } else {
-                        RecordUtils.upsertBinlog(
-                                snapshotRecords,
-                                record,
-                                currentSnapshotSplit.getSplitKeyType(),
-                                nameAdjuster,
-                                currentSnapshotSplit.getSplitStart(),
-                                currentSnapshotSplit.getSplitEnd());
+                        List<SourceRecord> records =
+                                snapshotRecords.computeIfAbsent(
+                                        (Struct) record.value(), key -> new 
LinkedList<>());
+                        records.add(record);
                     }
+                } else {
+                    RecordUtils.upsertBinlog(
+                            snapshotRecords,
+                            record,
+                            currentSnapshotSplit.getSplitKeyType(),
+                            nameAdjuster,
+                            currentSnapshotSplit.getSplitStart(),
+                            currentSnapshotSplit.getSplitEnd());
                 }
             }
-            // snapshot split return its data once
-            hasNextElement.set(false);
-
-            final List<SourceRecord> normalizedRecords = new ArrayList<>();
-            normalizedRecords.add(lowWatermark);
-            normalizedRecords.addAll(
-                    RecordUtils.formatMessageTimestamp(
-                            snapshotRecords.values().stream()
-                                    .flatMap(Collection::stream)
-                                    .collect(Collectors.toList())));
-            normalizedRecords.add(highWatermark);
-
-            final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
-            sourceRecordsSet.add(new SourceRecords(normalizedRecords));
-            return sourceRecordsSet.iterator();
         }
-        // the data has been polled, no more data
-        reachEnd.compareAndSet(false, true);
-        return null;
+        // snapshot split return its data once
+        hasNextElement.set(false);
+
+        final List<SourceRecord> normalizedRecords = new ArrayList<>();
+        normalizedRecords.add(lowWatermark);
+        normalizedRecords.addAll(
+                RecordUtils.formatMessageTimestamp(
+                        snapshotRecords.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList())));
+        normalizedRecords.add(highWatermark);
+
+        final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+        sourceRecordsSet.add(new SourceRecords(normalizedRecords));
+        return sourceRecordsSet.iterator();
     }
 
     private void checkReadException() {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
index e1178982b..61648756d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
@@ -102,7 +103,7 @@ public class MySqlSplitReader implements 
SplitReader<SourceRecords, MySqlSplit>
         Iterator<SourceRecords> dataIt;
         if (currentReader == null) {
             // (1) Reads binlog split firstly and then read snapshot split
-            if (binlogSplits.size() > 0) {
+            if (!binlogSplits.isEmpty()) {
                 // the binlog split may come from:
                 // (a) the initial binlog split
                 // (b) added back binlog-split in newly added table process
@@ -110,7 +111,7 @@ public class MySqlSplitReader implements 
SplitReader<SourceRecords, MySqlSplit>
                 currentSplitId = nextSplit.splitId();
                 currentReader = getBinlogSplitReader();
                 currentReader.submitSplit(nextSplit);
-            } else if (snapshotSplits.size() > 0) {
+            } else if (!snapshotSplits.isEmpty()) {
                 MySqlSplit nextSplit = snapshotSplits.poll();
                 currentSplitId = nextSplit.splitId();
                 currentReader = getSnapshotSplitReader();
@@ -119,19 +120,21 @@ public class MySqlSplitReader implements 
SplitReader<SourceRecords, MySqlSplit>
                 LOG.info("No available split to read.");
             }
             dataIt = currentReader.pollSplitRecords();
-            return dataIt == null ? finishedSplit() : forRecords(dataIt);
+            return dataIt == null ? finishedSplit(true) : 
forUnfinishedRecords(dataIt);
         } else if (currentReader instanceof SnapshotSplitReader) {
-            // (2) try to switch to binlog split reading util current snapshot 
split finished
             dataIt = currentReader.pollSplitRecords();
             if (dataIt != null) {
                 // first fetch data of snapshot split, return and emit the 
records of snapshot split
-                MySqlRecords records;
+                return forUnfinishedRecords(dataIt);
+            } else {
+                // (2) try to switch to binlog split reading util current 
snapshot split finished
+                MySqlRecords finishedRecords;
                 if (context.isHasAssignedBinlogSplit()) {
-                    records = forNewAddedTableFinishedSplit(currentSplitId, 
dataIt);
+                    finishedRecords = 
forNewAddedTableFinishedSplit(currentSplitId);
                     closeSnapshotReader();
                     closeBinlogReader();
                 } else {
-                    records = forRecords(dataIt);
+                    finishedRecords = finishedSplit(false);
                     MySqlSplit nextSplit = snapshotSplits.poll();
                     if (nextSplit != null) {
                         currentSplitId = nextSplit.splitId();
@@ -140,9 +143,7 @@ public class MySqlSplitReader implements 
SplitReader<SourceRecords, MySqlSplit>
                         closeSnapshotReader();
                     }
                 }
-                return records;
-            } else {
-                return finishedSplit();
+                return finishedRecords;
             }
         } else if (currentReader instanceof BinlogSplitReader) {
             // (3) switch to snapshot split reading if there are newly added 
snapshot splits
@@ -157,46 +158,41 @@ public class MySqlSplitReader implements 
SplitReader<SourceRecords, MySqlSplit>
                     currentReader = getSnapshotSplitReader();
                     currentReader.submitSplit(nextSplit);
                 }
-                return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
+                return MySqlRecords.forUnfinishedRecords(BINLOG_SPLIT_ID, 
dataIt);
             } else {
                 // null will be returned after receiving suspend binlog event
                 // finish current binlog split reading
                 closeBinlogReader();
-                return finishedSplit();
+                return finishedSplit(true);
             }
         } else {
             throw new IllegalStateException("Unsupported reader type.");
         }
     }
 
-    private MySqlRecords finishedSplit() {
+    private MySqlRecords finishedSplit(boolean recycleScanFetcher) {
         final MySqlRecords finishedRecords = 
MySqlRecords.forFinishedSplit(currentSplitId);
+        if (recycleScanFetcher) {
+            closeSnapshotReader();
+        }
         currentSplitId = null;
         return finishedRecords;
     }
 
-    private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {
-        if (currentReader instanceof SnapshotSplitReader) {
-            final MySqlRecords finishedRecords =
-                    MySqlRecords.forSnapshotRecords(currentSplitId, dataIt);
-            closeSnapshotReader();
-            return finishedRecords;
-        } else {
-            return MySqlRecords.forBinlogRecords(currentSplitId, dataIt);
-        }
+    private MySqlRecords forUnfinishedRecords(Iterator<SourceRecords> dataIt) {
+        return MySqlRecords.forUnfinishedRecords(currentSplitId, dataIt);
     }
 
     /**
      * Finishes new added snapshot split, mark the binlog split as finished 
too, we will add the
      * binlog split back in {@code MySqlSourceReader}.
      */
-    private MySqlRecords forNewAddedTableFinishedSplit(
-            final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
+    private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) {
         final Set<String> finishedSplits = new HashSet<>();
         finishedSplits.add(splitId);
         finishedSplits.add(BINLOG_SPLIT_ID);
         currentSplitId = null;
-        return new MySqlRecords(splitId, recordsForSplit, finishedSplits);
+        return new MySqlRecords(splitId, Collections.emptyIterator(), 
finishedSplits);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
index 9d6192aa5..4b6145237 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
@@ -76,16 +76,11 @@ public final class MySqlRecords implements 
RecordsWithSplitIds<SourceRecords> {
         return finishedSnapshotSplits;
     }
 
-    public static MySqlRecords forBinlogRecords(
+    public static MySqlRecords forUnfinishedRecords(
             final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
         return new MySqlRecords(splitId, recordsForSplit, 
Collections.emptySet());
     }
 
-    public static MySqlRecords forSnapshotRecords(
-            final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
-        return new MySqlRecords(splitId, recordsForSplit, 
Collections.singleton(splitId));
-    }
-
     public static MySqlRecords forFinishedSplit(final String splitId) {
         return new MySqlRecords(null, null, Collections.singleton(splitId));
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
index 274ffaacd..d53307d7c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
@@ -168,7 +168,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
 
     public static Stream<Arguments> parameters() {
         return Stream.of(
-                Arguments.of("customers", null, null),
+                Arguments.of("customers", null, "false"),
                 Arguments.of("customers", "id", "true"),
                 Arguments.of("customers_no_pk", "id", "true"));
     }
@@ -184,13 +184,18 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
     void testReadSingleTableWithSingleParallelismAndSkipBackFill(
             String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
+        HashMap<String, String> option = new HashMap<>();
+        option.put("scan.incremental.snapshot.backfill.skip", "true");
+        option.put("scan.incremental.snapshot.chunk.size", "1");
         testMySqlParallelSource(
                 1,
                 DEFAULT_SCAN_STARTUP_MODE,
@@ -198,10 +203,11 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 FailoverPhase.NEVER,
                 new String[] {tableName},
                 RestartStrategies.fixedDelayRestart(1, 0),
-                true,
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -215,7 +221,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -229,7 +237,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName, "customers_1"},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -243,7 +253,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName, "customers_1"},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     // Failover tests
@@ -257,7 +269,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName, "customers_1"},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -270,7 +284,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName, "customers_1"},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -286,7 +302,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 RestartStrategies.fixedDelayRestart(1, 0),
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -299,7 +317,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName, "customers_1"},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -312,7 +332,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName, "customers_1"},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -328,7 +350,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 RestartStrategies.fixedDelayRestart(1, 0),
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -342,7 +366,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
@@ -356,7 +382,26 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 new String[] {tableName},
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
+    }
+
+    @Test
+    public void testReadSingleTableMutilpleFetch() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("debezium.snapshot.fetch.size", "2");
+        options.put("debezium.max.batch.size", "3");
+        testMySqlParallelSource(
+                1,
+                DEFAULT_SCAN_STARTUP_MODE,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                new String[] {"customers"},
+                RestartStrategies.fixedDelayRestart(1, 0),
+                "customers",
+                "id",
+                options);
     }
 
     @ParameterizedTest
@@ -452,10 +497,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 StartupOptions.latest(), Collections.emptyList(), tableName, 
chunkColumnName);
     }
 
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testSnapshotOnlyModeWithDMLPostHighWaterMark(String tableName, String 
chunkColumnName)
-            throws Exception {
+    @Test
+    void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
         List<String> records =
                 testBackfillWhenWritingEvents(
                         false, 21, USE_POST_HIGHWATERMARK_HOOK, 
StartupOptions.snapshot());
@@ -485,10 +528,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(expectedRecords, records);
     }
 
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testSnapshotOnlyModeWithDMLPreHighWaterMark(String tableName, String 
chunkColumnName)
-            throws Exception {
+    @Test
+    void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
         List<String> records =
                 testBackfillWhenWritingEvents(
                         false, 21, USE_PRE_HIGHWATERMARK_HOOK, 
StartupOptions.snapshot());
@@ -520,10 +561,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(expectedRecords, records);
     }
 
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testEnableBackfillWithDMLPreHighWaterMark(String tableName, String 
chunkColumnName)
-            throws Exception {
+    @Test
+    void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
 
         List<String> records =
                 testBackfillWhenWritingEvents(
@@ -557,10 +596,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(expectedRecords, records);
     }
 
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testEnableBackfillWithDMLPostLowWaterMark(String tableName, String 
chunkColumnName)
-            throws Exception {
+    @Test
+    void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
 
         List<String> records =
                 testBackfillWhenWritingEvents(
@@ -594,10 +631,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(expectedRecords, records);
     }
 
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testSkipBackfillWithDMLPreHighWaterMark(String tableName, String 
chunkColumnName)
-            throws Exception {
+    @Test
+    void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
 
         List<String> records =
                 testBackfillWhenWritingEvents(
@@ -635,10 +670,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(expectedRecords, records);
     }
 
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testSkipBackfillWithDMLPostLowWaterMark(String tableName, String 
chunkColumnName)
-            throws Exception {
+    @Test
+    void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
 
         List<String> records =
                 testBackfillWhenWritingEvents(
@@ -677,106 +710,6 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(expectedRecords, records);
     }
 
-    @SuppressWarnings("unchecked")
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testSourceMetrics(String tableName, String chunkColumnName) throws 
Exception {
-        customDatabase.createAndInitialize();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        MySqlSource<String> source =
-                MySqlSource.<String>builder()
-                        .hostname(MYSQL_CONTAINER.getHost())
-                        .port(MYSQL_CONTAINER.getDatabasePort())
-                        .databaseList(customDatabase.getDatabaseName())
-                        .tableList(customDatabase.getDatabaseName() + 
".customers")
-                        .username(customDatabase.getUsername())
-                        .password(customDatabase.getPassword())
-                        .deserializer(new 
StringDebeziumDeserializationSchema())
-                        .serverId(getServerId())
-                        .serverTimeZone("UTC")
-                        .build();
-        DataStreamSource<String> stream =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"MySQL CDC Source");
-        CollectResultIterator<String> iterator = addCollector(env, stream);
-        JobClient jobClient = env.executeAsync();
-        iterator.setJobClient(jobClient);
-
-        // ---------------------------- Snapshot phase 
------------------------------
-        // Wait until we receive all 21 snapshot records
-        int numSnapshotRecordsExpected = 21;
-        int numSnapshotRecordsReceived = 0;
-        while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && 
iterator.hasNext()) {
-            iterator.next();
-            numSnapshotRecordsReceived++;
-        }
-
-        // Check metrics
-        List<OperatorMetricGroup> metricGroups =
-                metricReporter.findOperatorMetricGroups(jobClient.getJobID(), 
"MySQL CDC Source");
-        // There should be only 1 parallelism of source, so it's safe to get 
the only group
-        OperatorMetricGroup group = metricGroups.get(0);
-        Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);
-
-        // numRecordsOut
-        
Assertions.assertThat(group.getIOMetricGroup().getNumRecordsOutCounter().getCount())
-                .isEqualTo(numSnapshotRecordsExpected);
-
-        // currentEmitEventTimeLag should be UNDEFINED during snapshot phase
-        
Assertions.assertThat(metrics).containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
-        Gauge<Long> currentEmitEventTimeLag =
-                (Gauge<Long>) 
metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
-        Assertions.assertThat((long) currentEmitEventTimeLag.getValue())
-                .isEqualTo(InternalSourceReaderMetricGroup.UNDEFINED);
-
-        // currentFetchEventTimeLag should be UNDEFINED during snapshot phase
-        
Assertions.assertThat(metrics).containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
-        Gauge<Long> currentFetchEventTimeLag =
-                (Gauge<Long>) 
metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
-        Assertions.assertThat((long) currentFetchEventTimeLag.getValue())
-                .isEqualTo(MySqlSourceReaderMetrics.UNDEFINED);
-
-        // sourceIdleTime should be positive (we can't know the exact value)
-        
Assertions.assertThat(metrics).containsKey(MetricNames.SOURCE_IDLE_TIME);
-        Gauge<Long> sourceIdleTime = (Gauge<Long>) 
metrics.get(MetricNames.SOURCE_IDLE_TIME);
-        Assertions.assertThat(sourceIdleTime.getValue())
-                .isPositive()
-                .isLessThan(TIMEOUT.toMillis());
-
-        // --------------------------------- Binlog phase 
-----------------------------
-        makeFirstPartBinlogEvents(getConnection(), 
customDatabase.qualifiedTableName("customers"));
-        // Wait until we receive 4 changes made above
-        int numBinlogRecordsExpected = 4;
-        int numBinlogRecordsReceived = 0;
-        while (numBinlogRecordsReceived < numBinlogRecordsExpected && 
iterator.hasNext()) {
-            iterator.next();
-            numBinlogRecordsReceived++;
-        }
-
-        // Check metrics
-        // numRecordsOut
-        
Assertions.assertThat(group.getIOMetricGroup().getNumRecordsOutCounter().getCount())
-                .isEqualTo(numSnapshotRecordsExpected + 
numBinlogRecordsExpected);
-
-        // currentEmitEventTimeLag should be reasonably positive (we can't 
know the exact value)
-        Assertions.assertThat(currentEmitEventTimeLag.getValue())
-                .isPositive()
-                .isLessThan(TIMEOUT.toMillis());
-
-        // currentEmitEventTimeLag should be reasonably positive (we can't 
know the exact value)
-        Assertions.assertThat(currentFetchEventTimeLag.getValue())
-                .isPositive()
-                .isLessThan(TIMEOUT.toMillis());
-
-        // currentEmitEventTimeLag should be reasonably positive (we can't 
know the exact value)
-        Assertions.assertThat(sourceIdleTime.getValue())
-                .isPositive()
-                .isLessThan(TIMEOUT.toMillis());
-
-        jobClient.cancel().get();
-        iterator.close();
-    }
-
     private List<String> testBackfillWhenWritingEvents(
             boolean skipSnapshotBackfill,
             int fetchSize,
@@ -1077,7 +1010,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
             String[] captureCustomerTables,
             String tableName,
             String chunkColumnName,
-            String assignEndingFirst)
+            Map<String, String> options)
             throws Exception {
         testMySqlParallelSource(
                 DEFAULT_PARALLELISM,
@@ -1086,7 +1019,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 captureCustomerTables,
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
+                options);
     }
 
     private void testMySqlParallelSource(
@@ -1096,7 +1029,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
             String[] captureCustomerTables,
             String tableName,
             String chunkColumnName,
-            String assignEndingFirst)
+            Map<String, String> options)
             throws Exception {
         testMySqlParallelSource(
                 parallelism,
@@ -1107,31 +1040,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                 RestartStrategies.fixedDelayRestart(1, 0),
                 tableName,
                 chunkColumnName,
-                assignEndingFirst);
-    }
-
-    private void testMySqlParallelSource(
-            int parallelism,
-            String scanStartupMode,
-            FailoverType failoverType,
-            FailoverPhase failoverPhase,
-            String[] captureCustomerTables,
-            RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration,
-            String tableName,
-            String chunkColumnName,
-            String assignEndingFirst)
-            throws Exception {
-        testMySqlParallelSource(
-                parallelism,
-                scanStartupMode,
-                failoverType,
-                failoverPhase,
-                captureCustomerTables,
-                restartStrategyConfiguration,
-                false,
-                tableName,
-                chunkColumnName,
-                assignEndingFirst);
+                options);
     }
 
     private void testMySqlParallelSource(
@@ -1141,10 +1050,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
             FailoverPhase failoverPhase,
             String[] captureCustomerTables,
             RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration,
-            boolean skipSnapshotBackfill,
             String tableName,
             String chunkColumnName,
-            String assignEndingFirst)
+            Map<String, String> otherOptions)
             throws Exception {
         customDatabase.createAndInitialize();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1174,9 +1082,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.startup.mode' = '%s',"
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
-                                + " 'scan.incremental.snapshot.backfill.skip' 
= '%s',"
                                 + " 'server-time-zone' = 'UTC',"
-                                + " 
'scan.incremental.snapshot.unbounded-chunk-first.enabled' = '%s',"
                                 + " 'server-id' = '%s'"
                                 + " %s"
                                 + ")",
@@ -1187,14 +1093,22 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
                         customDatabase.getDatabaseName(),
                         getTableNameRegex(captureCustomerTables),
                         scanStartupMode,
-                        skipSnapshotBackfill,
-                        assignEndingFirst == null ? "false" : "true",
                         getServerId(),
                         chunkColumnName == null
                                 ? ""
                                 : String.format(
                                         ", 
'scan.incremental.snapshot.chunk.key-column' = '%s'",
-                                        chunkColumnName));
+                                        chunkColumnName),
+                        otherOptions.isEmpty()
+                                ? ""
+                                : ","
+                                        + otherOptions.entrySet().stream()
+                                                .map(
+                                                        e ->
+                                                                String.format(
+                                                                        
"'%s'='%s'",
+                                                                        
e.getKey(), e.getValue()))
+                                                
.collect(Collectors.joining(",")));
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("select * from customers");
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
index 9495234bb..692bd866f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
@@ -831,6 +831,8 @@ class NewlyAddedTableITCase extends MySqlSourceTestBase {
             waitForUpsertSinkSize("sink", fetchedDataList.size());
             assertEqualsInAnyOrder(
                     fetchedDataList, 
TestValuesTableFactory.getResultsAsStrings("sink"));
+            // Wait 1s until snapshot phase finished, make sure the binlog 
data is not lost.
+            Thread.sleep(1000L);
 
             // step 3: make some binlog data for this round
             makeFirstPartBinlogForAddressTable(getConnection(), 
newlyAddedTable);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index ae3d78c5d..3f0cd2a2e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -69,6 +69,8 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,6 +103,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isH
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
+import static org.apache.flink.core.io.InputStatus.END_OF_INPUT;
 import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -184,8 +187,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                     "+I[2000, user_21, Shanghai, 123567891234]"
                 };
         // Step 2: wait the snapshot splits finished reading
-        Thread.sleep(5000L);
-        List<String> actualRecords = consumeRecords(reader, dataType);
+        List<String> actualRecords = consumeSnapshotRecords(reader, dataType);
         assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
         reader.handleSourceEvents(
                 new FinishedSnapshotSplitsAckEvent(
@@ -213,10 +215,12 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
         restartReader.close();
     }
 
-    @Test
-    void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testFinishedUnackedSplitsUsingStateFromSnapshotPhase(boolean 
skipBackFill)
+            throws Exception {
         customerDatabase.createAndInitialize();
-        final MySqlSourceConfig sourceConfig = getConfig(new String[] 
{"customers"});
+        final MySqlSourceConfig sourceConfig = getConfig(new String[] 
{"customers"}, skipBackFill);
         final DataType dataType =
                 DataTypes.ROW(
                         DataTypes.FIELD("id", DataTypes.BIGINT()),
@@ -294,8 +298,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                     "+I[2000, user_21, Shanghai, 123567891234]"
                 };
         // Step 2: wait the snapshot splits finished reading
-        Thread.sleep(5000L);
-        List<String> actualRecords = consumeRecords(reader, dataType);
+        List<String> actualRecords = consumeSnapshotRecords(reader, dataType);
         assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
 
         // Step 3: snapshot reader's state
@@ -352,7 +355,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                     "+U[103, user_3, Hangzhou, 123567891234]"
                 };
         // the 2 records are produced by 1 operations
-        List<String> actualRecords = consumeRecords(reader, dataType);
+        List<String> actualRecords = consumeBinlogRecords(reader, dataType);
         assertEqualsInOrder(Arrays.asList(expectedRecords), actualRecords);
         List<MySqlSplit> splitsState = reader.snapshotState(1L);
         // check the binlog split state
@@ -373,7 +376,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                     "+U[103, user_3, Shanghai, 123567891234]"
                 };
         // the 4 records are produced by 3 operations
-        List<String> restRecords = consumeRecords(restartReader, dataType);
+        List<String> restRecords = consumeBinlogRecords(restartReader, 
dataType);
         assertEqualsInOrder(Arrays.asList(expectedRestRecords), restRecords);
         restartReader.close();
     }
@@ -610,6 +613,10 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
     }
 
     private MySqlSourceConfig getConfig(String[] captureTables) {
+        return getConfig(captureTables, false);
+    }
+
+    private MySqlSourceConfig getConfig(String[] captureTables, boolean 
skipBackFill) {
         String[] captureTableIds =
                 Arrays.stream(captureTables)
                         .map(tableName -> customerDatabase.getDatabaseName() + 
"." + tableName)
@@ -627,21 +634,34 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                 .username(customerDatabase.getUsername())
                 .password(customerDatabase.getPassword())
                 .serverTimeZone(ZoneId.of("UTC").toString())
+                .skipSnapshotBackfill(skipBackFill)
                 .createConfig(0);
     }
 
-    private List<String> consumeRecords(
+    private List<String> consumeSnapshotRecords(
             MySqlSourceReader<SourceRecord> sourceReader, DataType recordType) 
throws Exception {
-        // Poll all the n records of the single split.
+        // Poll all the  records of the multiple assigned snapshot split.
+        sourceReader.notifyNoMoreSplits();
         final SimpleReaderOutput output = new SimpleReaderOutput();
         InputStatus status = MORE_AVAILABLE;
-        while (MORE_AVAILABLE == status || output.getResults().size() == 0) {
+        while (END_OF_INPUT != status) {
             status = sourceReader.pollNext(output);
         }
         final RecordsFormatter formatter = new RecordsFormatter(recordType);
         return formatter.format(output.getResults());
     }
 
+    private List<String> consumeBinlogRecords(
+            MySqlSourceReader<SourceRecord> sourceReader, DataType recordType) 
throws Exception {
+        // Poll one batch records of the binlog split.
+        final SimpleReaderOutput output = new SimpleReaderOutput();
+        while (output.getResults().isEmpty()) {
+            sourceReader.pollNext(output);
+        }
+        final RecordsFormatter formatter = new RecordsFormatter(recordType);
+        return formatter.format(output.getResults());
+    }
+
     // ------------------------------------------------------------------------
     //  test utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index 7ec84611a..7cc5fe586 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -1419,6 +1419,9 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
                 Deadline.fromNow(Duration.ofSeconds(10)));
         CloseableIterator<Row> iterator = result.collect();
         waitForSnapshotStarted(iterator);
+        // Wait 1s until snapshot phase finished, cannot update DDL during 
snapshot phase.
+        List<String> actualRows = new ArrayList<>(fetchRows(iterator, 2));
+        Thread.sleep(1000L);
         try (Connection connection = customerDatabase.getJdbcConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DELETE FROM default_value_test WHERE id=1;");
@@ -1506,7 +1509,8 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
                             + "     tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 '"
                             + " );");
         }
-        assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, 
expected.length));
+        actualRows.addAll(fetchRows(iterator, expected.length - 2));
+        assertEqualsInAnyOrder(Arrays.asList(expected), actualRows);
         jobClient.cancel().get();
     }
 
@@ -1554,6 +1558,10 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
                 Deadline.fromNow(Duration.ofSeconds(10)));
         CloseableIterator<Row> iterator = result.collect();
         waitForSnapshotStarted(iterator);
+        // Wait 1s until snapshot phase finished, cannot update DDL during 
snapshot phase.
+        List<String> actualRows = new ArrayList<>(fetchRows(iterator, 2));
+        Thread.sleep(1000L);
+
         try (Connection connection = customerDatabase.getJdbcConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DELETE FROM default_value_test WHERE id=1;");
@@ -1572,7 +1580,8 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
             statement.execute(
                     "alter table default_value_test add column `int_test` INT 
DEFAULT ' 30 ';");
         }
-        assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, 
expected.length));
+        actualRows.addAll(fetchRows(iterator, expected.length - 2));
+        assertEqualsInAnyOrder(Arrays.asList(expected), actualRows);
         jobClient.cancel().get();
     }
 
@@ -2252,7 +2261,7 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
 
     private static void waitForSnapshotStarted(CloseableIterator<Row> 
iterator) throws Exception {
         while (!iterator.hasNext()) {
-            Thread.sleep(100);
+            Thread.sleep(1000);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
index 3cf295501..5dab6cd6f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
@@ -687,6 +687,8 @@ class NewlyAddedTableITCase extends OracleSourceTestBase {
             waitForUpsertSinkSize("sink", fetchedDataList.size());
             assertEqualsInAnyOrder(
                     fetchedDataList, 
TestValuesTableFactory.getResultsAsStrings("sink"));
+            // Wait 1s until snapshot phase finished, make sure the binlog 
data is not lost.
+            Thread.sleep(1000L);
 
             // step 3: make some redo log data for this round
             makeFirstPartRedoLogForAddressTable(newlyAddedTable);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
index 548108f94..ee868d0b8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -309,7 +309,7 @@ public class PostgresScanFetchTask extends 
AbstractScanFetchTask {
                                     snapshotSplit.getSplitStart(),
                                     snapshotSplit.getSplitEnd(),
                                     
snapshotSplit.getSplitKeyType().getFieldCount(),
-                                    connectorConfig.getQueryFetchSize());
+                                    connectorConfig.getSnapshotFetchSize());
                     ResultSet rs = selectStatement.executeQuery()) {
 
                 ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, 
table);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
index f9016cd27..bee31f981 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
@@ -725,6 +725,8 @@ class NewlyAddedTableITCase extends PostgresTestBase {
             PostgresTestUtils.waitForUpsertSinkSize("sink", 
fetchedDataList.size());
             assertEqualsInAnyOrder(
                     fetchedDataList, 
TestValuesTableFactory.getResultsAsStrings("sink"));
+            // Wait 1s until snapshot phase finished, make sure the binlog 
data is not lost.
+            Thread.sleep(1000L);
 
             // step 3: make some wal log data for this round
             makeFirstPartWalLogForAddressTable(getConnection(), 
newlyAddedTable);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 5d212c342..da0ac487b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -377,6 +377,21 @@ class PostgresSourceITCase extends PostgresTestBase {
         optionalJobClient.get().cancel().get();
     }
 
+    @Test
+    void testReadSingleTableMutilpleFetch() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("debezium.snapshot.fetch.size", "2");
+        options.put("debezium.max.batch.size", "3");
+        testPostgresParallelSource(
+                1,
+                DEFAULT_SCAN_STARTUP_MODE,
+                PostgresTestUtils.FailoverType.NONE,
+                PostgresTestUtils.FailoverPhase.NEVER,
+                new String[] {"Customers"},
+                RestartStrategies.fixedDelayRestart(1, 0),
+                options);
+    }
+
     @Test
     void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
         // The data num is 21, set fetchSize = 22 to test the job is bounded.
@@ -1069,7 +1084,6 @@ class PostgresSourceITCase extends PostgresTestBase {
             PostgresTestUtils.FailoverPhase failoverPhase,
             String[] captureCustomerTables)
             throws Exception {
-
         waitUntilJobRunning(tableResult);
         CloseableIterator<Row> iterator = tableResult.collect();
         Optional<JobClient> optionalJobClient = tableResult.getJobClient();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index 509064c5f..ee3dcdcb0 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
 import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
 import org.apache.flink.cdc.connectors.postgres.testutils.TestTableId;
 import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
@@ -44,14 +45,19 @@ import org.apache.flink.table.types.DataType;
 
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import org.apache.kafka.connect.source.SourceRecord;
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link PostgresScanFetchTask}. */
 class PostgresScanFetchTaskTest extends PostgresTestBase {
@@ -237,6 +243,45 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
+    @Test
+    void testSnapshotFetchSize() throws Exception {
+        customDatabase.createAndInitialize();
+        PostgresSourceConfigFactory sourceConfigFactory =
+                getMockPostgresSourceConfigFactory(customDatabase, schemaName, 
tableName, 10, true);
+        Properties properties = new Properties();
+        properties.setProperty("snapshot.fetch.size", "2");
+        sourceConfigFactory.debeziumProperties(properties);
+        PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
+        PostgresDialect postgresDialect = new 
PostgresDialect(sourceConfigFactory.create(0));
+        SnapshotSplit snapshotSplit = getSnapshotSplits(sourceConfig, 
postgresDialect).get(0);
+        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
+                new PostgresSourceFetchTaskContext(sourceConfig, 
postgresDialect);
+        final String selectSql =
+                PostgresQueryUtils.buildSplitScanQuery(
+                        snapshotSplit.getTableId(),
+                        snapshotSplit.getSplitKeyType(),
+                        snapshotSplit.getSplitStart() == null,
+                        snapshotSplit.getSplitEnd() == null,
+                        Collections.emptyList());
+
+        try (PostgresConnection jdbcConnection = 
postgresDialect.openJdbcConnection();
+                PreparedStatement selectStatement =
+                        PostgresQueryUtils.readTableSplitDataStatement(
+                                jdbcConnection,
+                                selectSql,
+                                snapshotSplit.getSplitStart() == null,
+                                snapshotSplit.getSplitEnd() == null,
+                                snapshotSplit.getSplitStart(),
+                                snapshotSplit.getSplitEnd(),
+                                
snapshotSplit.getSplitKeyType().getFieldCount(),
+                                postgresSourceFetchTaskContext
+                                        .getDbzConnectorConfig()
+                                        .getSnapshotFetchSize());
+                ResultSet rs = selectStatement.executeQuery()) {
+            assertThat(rs.getFetchSize()).isEqualTo(2);
+        }
+    }
+
     private List<String> getDataInSnapshotScan(
             String[] changingDataSql,
             String schemaName,
@@ -314,8 +359,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
 
         sourceScanFetcher.close();
 
-        
Assertions.assertThat(sourceScanFetcher.getExecutorService()).isNotNull();
-        
Assertions.assertThat(sourceScanFetcher.getExecutorService().isTerminated()).isTrue();
+        assertThat(sourceScanFetcher.getExecutorService()).isNotNull();
+        
assertThat(sourceScanFetcher.getExecutorService().isTerminated()).isTrue();
 
         return formatResult(result, dataType);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 905d47131..68ef4690e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -17,27 +17,69 @@
 
 package org.apache.flink.cdc.connectors.postgres.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
 import org.apache.flink.cdc.connectors.postgres.source.MockPostgresDialect;
 import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
-import org.apache.flink.cdc.connectors.postgres.testutils.TestTable;
+import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
+import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
 
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.table.api.DataTypes.BIGINT;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.core.io.InputStatus.END_OF_INPUT;
+import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link PostgresSourceReader}. */
-class PostgresSourceReaderTest {
+public class PostgresSourceReaderTest extends PostgresTestBase {
+    private static final String DB_NAME_PREFIX = "postgres";
+    private static final String SCHEMA_NAME = "customer";
+    private String slotName;
+    private final UniqueDatabase customDatabase =
+            new UniqueDatabase(
+                    POSTGRES_CONTAINER,
+                    DB_NAME_PREFIX,
+                    SCHEMA_NAME,
+                    POSTGRES_CONTAINER.getUsername(),
+                    POSTGRES_CONTAINER.getPassword());
+
+    @BeforeEach
+    public void before() throws SQLException {
+        customDatabase.createAndInitialize();
+
+        this.slotName = getSlotName();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        customDatabase.removeSlot(slotName);
+    }
 
     @Test
     void testNotifyCheckpointWindowSizeOne() throws Exception {
@@ -69,23 +111,173 @@ class PostgresSourceReaderTest {
         assertThat(completedCheckpointIds).containsExactly(101L);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testMultipleSnapshotSplit(boolean skipBackFill) throws Exception {
+        final DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("Id", DataTypes.BIGINT()),
+                        DataTypes.FIELD("Name", DataTypes.STRING()),
+                        DataTypes.FIELD("address", DataTypes.STRING()),
+                        DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+        TableId tableId = new TableId(null, SCHEMA_NAME, "Customers");
+        RowType splitType =
+                RowType.of(
+                        new LogicalType[] {DataTypes.INT().getLogicalType()}, 
new String[] {"Id"});
+        List<SnapshotSplit> snapshotSplits =
+                Arrays.asList(
+                        new SnapshotSplit(
+                                tableId,
+                                0,
+                                splitType,
+                                null,
+                                new Integer[] {200},
+                                null,
+                                Collections.emptyMap()),
+                        new SnapshotSplit(
+                                tableId,
+                                1,
+                                splitType,
+                                new Integer[] {200},
+                                new Integer[] {1500},
+                                null,
+                                Collections.emptyMap()),
+                        new SnapshotSplit(
+                                tableId,
+                                2,
+                                splitType,
+                                new Integer[] {1500},
+                                null,
+                                null,
+                                Collections.emptyMap()));
+
+        // Step 1: start source reader and assign snapshot splits
+        PostgresSourceReader reader = createReader(-1, skipBackFill);
+        reader.start();
+        reader.addSplits(snapshotSplits);
+
+        String[] expectedRecords =
+                new String[] {
+                    "+I[111, user_6, Shanghai, 123567891234]",
+                    "+I[110, user_5, Shanghai, 123567891234]",
+                    "+I[101, user_1, Shanghai, 123567891234]",
+                    "+I[103, user_3, Shanghai, 123567891234]",
+                    "+I[102, user_2, Shanghai, 123567891234]",
+                    "+I[118, user_7, Shanghai, 123567891234]",
+                    "+I[121, user_8, Shanghai, 123567891234]",
+                    "+I[123, user_9, Shanghai, 123567891234]",
+                    "+I[109, user_4, Shanghai, 123567891234]",
+                    "+I[1009, user_10, Shanghai, 123567891234]",
+                    "+I[1011, user_12, Shanghai, 123567891234]",
+                    "+I[1010, user_11, Shanghai, 123567891234]",
+                    "+I[1013, user_14, Shanghai, 123567891234]",
+                    "+I[1012, user_13, Shanghai, 123567891234]",
+                    "+I[1015, user_16, Shanghai, 123567891234]",
+                    "+I[1014, user_15, Shanghai, 123567891234]",
+                    "+I[1017, user_18, Shanghai, 123567891234]",
+                    "+I[1016, user_17, Shanghai, 123567891234]",
+                    "+I[1019, user_20, Shanghai, 123567891234]",
+                    "+I[1018, user_19, Shanghai, 123567891234]",
+                    "+I[2000, user_21, Shanghai, 123567891234]"
+                };
+        // Step 2: wait the snapshot splits finished reading
+        List<String> actualRecords = consumeSnapshotRecords(reader, dataType);
+        assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
+    }
+
+    private List<String> consumeSnapshotRecords(
+            PostgresSourceReader sourceReader, DataType recordType) throws 
Exception {
+        // Poll all the  records of the multiple assigned snapshot split.
+        sourceReader.notifyNoMoreSplits();
+        final SimpleReaderOutput output = new SimpleReaderOutput();
+        InputStatus status = MORE_AVAILABLE;
+        while (END_OF_INPUT != status) {
+            status = sourceReader.pollNext(output);
+        }
+        final RecordsFormatter formatter = new RecordsFormatter(recordType);
+        return formatter.format(output.getResults());
+    }
+
     private PostgresSourceReader createReader(final int 
lsnCommitCheckpointsDelay)
             throws Exception {
+        return createReader(lsnCommitCheckpointsDelay, false);
+    }
+
+    private PostgresSourceReader createReader(
+            final int lsnCommitCheckpointsDelay, boolean skipBackFill) throws 
Exception {
         final PostgresOffsetFactory offsetFactory = new 
PostgresOffsetFactory();
         final PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
-        configFactory.hostname("host");
-        configFactory.database("pgdb");
-        configFactory.username("username");
-        configFactory.password("password");
-        configFactory.decodingPluginName("pgoutput");
+        configFactory.hostname(customDatabase.getHost());
+        configFactory.port(customDatabase.getDatabasePort());
+        configFactory.database(customDatabase.getDatabaseName());
+        configFactory.tableList(SCHEMA_NAME + ".Customers");
+        configFactory.username(customDatabase.getUsername());
+        configFactory.password(customDatabase.getPassword());
         configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
-        final TestTable customerTable =
-                new TestTable(ResolvedSchema.of(Column.physical("id", 
BIGINT())));
-        final DebeziumDeserializationSchema<?> deserializer = 
customerTable.getDeserializer();
+        configFactory.skipSnapshotBackfill(skipBackFill);
+        configFactory.decodingPluginName("pgoutput");
         MockPostgresDialect dialect = new 
MockPostgresDialect(configFactory.create(0));
         final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
                 new PostgresSourceBuilder.PostgresIncrementalSource<>(
-                        configFactory, checkNotNull(deserializer), 
offsetFactory, dialect);
+                        configFactory, new ForwardDeserializeSchema(), 
offsetFactory, dialect);
         return source.createReader(new TestingReaderContext());
     }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+    private static class SimpleReaderOutput implements 
ReaderOutput<SourceRecord> {
+
+        private final List<SourceRecord> results = new ArrayList<>();
+
+        @Override
+        public void collect(SourceRecord record) {
+            results.add(record);
+        }
+
+        public List<SourceRecord> getResults() {
+            return results;
+        }
+
+        @Override
+        public void collect(SourceRecord record, long timestamp) {
+            collect(record);
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) {}
+
+        @Override
+        public void markIdle() {}
+
+        @Override
+        public void markActive() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public SourceOutput<SourceRecord> 
createOutputForSplit(java.lang.String splitId) {
+            return this;
+        }
+
+        @Override
+        public void releaseOutputForSplit(java.lang.String splitId) {}
+    }
+
+    private static class ForwardDeserializeSchema
+            implements DebeziumDeserializationSchema<SourceRecord> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void deserialize(SourceRecord record, Collector<SourceRecord> 
out) throws Exception {
+            out.collect(record);
+        }
+
+        @Override
+        public TypeInformation<SourceRecord> getProducedType() {
+            return TypeInformation.of(SourceRecord.class);
+        }
+    }
 }


Reply via email to