This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new c9a273212 [FLINK-37509][base&mysql] Increment scan source no need to
buffer data when enable skipping backfill. (#3964)
c9a273212 is described below
commit c9a273212ab04d7ce069c4b8eef8738b4c0079ca
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Apr 25 17:37:45 2025 +0800
[FLINK-37509][base&mysql] Increment scan source no need to buffer data when
enable skipping backfill. (#3964)
---
.../reader/IncrementalSourceSplitReader.java | 45 ++--
.../external/IncrementalSourceScanFetcher.java | 124 ++++++----
.../mongodb/source/MongoDBFullChangelogITCase.java | 9 +
.../mongodb/source/NewlyAddedTableITCase.java | 2 +
.../mysql/debezium/reader/SnapshotSplitReader.java | 154 +++++++-----
.../mysql/source/reader/MySqlSplitReader.java | 46 ++--
.../mysql/source/split/MySqlRecords.java | 7 +-
.../connectors/mysql/source/MySqlSourceITCase.java | 262 +++++++--------------
.../mysql/source/NewlyAddedTableITCase.java | 2 +
.../mysql/source/reader/MySqlSourceReaderTest.java | 44 +++-
.../mysql/table/MySqlConnectorITCase.java | 15 +-
.../oracle/source/NewlyAddedTableITCase.java | 2 +
.../source/fetch/PostgresScanFetchTask.java | 2 +-
.../postgres/source/NewlyAddedTableITCase.java | 2 +
.../postgres/source/PostgresSourceITCase.java | 16 +-
.../source/fetch/PostgresScanFetchTaskTest.java | 51 +++-
.../source/reader/PostgresSourceReaderTest.java | 222 +++++++++++++++--
17 files changed, 625 insertions(+), 380 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
index ead3d01c0..84d687005 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
@@ -45,6 +45,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@@ -92,7 +93,6 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
@Override
public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
-
try {
suspendStreamReaderIfNeed();
return pollSplitRecords();
@@ -145,13 +145,13 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
Iterator<SourceRecords> dataIt = null;
if (currentFetcher == null) {
// (1) Reads stream split firstly and then read snapshot split
- if (streamSplits.size() > 0) {
+ if (!streamSplits.isEmpty()) {
// the stream split may come from:
// (a) the initial stream split
// (b) added back stream-split in newly added table process
StreamSplit nextSplit = streamSplits.poll();
submitStreamSplit(nextSplit);
- } else if (snapshotSplits.size() > 0) {
+ } else if (!snapshotSplits.isEmpty()) {
submitSnapshotSplit(snapshotSplits.poll());
} else {
LOG.info("No available split to read.");
@@ -162,19 +162,21 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
} else {
currentSplitId = null;
}
- return dataIt == null ? finishedSplit() : forRecords(dataIt);
+ return dataIt == null ? finishedSplit(true) :
forUnfinishedRecords(dataIt);
} else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
- // (2) try to switch to stream split reading util current snapshot
split finished
dataIt = currentFetcher.pollSplitRecords();
if (dataIt != null) {
// first fetch data of snapshot split, return and emit the
records of snapshot split
- ChangeEventRecords records;
+ return forUnfinishedRecords(dataIt);
+ } else {
+ // (2) try to switch to stream split reading util current
snapshot split finished
+ ChangeEventRecords finishedRecords;
if (context.isHasAssignedStreamSplit()) {
- records = forNewAddedTableFinishedSplit(currentSplitId,
dataIt);
+ finishedRecords =
forNewAddedTableFinishedSplit(currentSplitId);
closeScanFetcher();
closeStreamFetcher();
} else {
- records = forRecords(dataIt);
+ finishedRecords = finishedSplit(false);
SnapshotSplit nextSplit = snapshotSplits.poll();
if (nextSplit != null) {
checkState(reusedScanFetcher != null);
@@ -183,9 +185,7 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
closeScanFetcher();
}
}
- return records;
- } else {
- return finishedSplit();
+ return finishedRecords;
}
} else if (currentFetcher instanceof IncrementalSourceStreamFetcher) {
// (3) switch to snapshot split reading if there are newly added
snapshot splits
@@ -203,7 +203,7 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
// null will be returned after receiving suspend stream event
// finish current stream split reading
closeStreamFetcher();
- return finishedSplit();
+ return finishedSplit(true);
}
} else {
throw new IllegalStateException("Unsupported reader type.");
@@ -215,9 +215,12 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
return currentFetcher == null || currentFetcher.isFinished();
}
- private ChangeEventRecords finishedSplit() {
+ private ChangeEventRecords finishedSplit(boolean recycleScanFetcher) {
final ChangeEventRecords finishedRecords =
ChangeEventRecords.forFinishedSplit(currentSplitId);
+ if (recycleScanFetcher) {
+ closeScanFetcher();
+ }
currentSplitId = null;
return finishedRecords;
}
@@ -226,24 +229,16 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
* Finishes new added snapshot split, mark the stream split as finished
too, we will add the
* stream split back in {@code MySqlSourceReader}.
*/
- private ChangeEventRecords forNewAddedTableFinishedSplit(
- final String splitId, final Iterator<SourceRecords>
recordsForSplit) {
+ private ChangeEventRecords forNewAddedTableFinishedSplit(final String
splitId) {
final Set<String> finishedSplits = new HashSet<>();
finishedSplits.add(splitId);
finishedSplits.add(STREAM_SPLIT_ID);
currentSplitId = null;
- return new ChangeEventRecords(splitId, recordsForSplit,
finishedSplits);
+ return new ChangeEventRecords(splitId, Collections.emptyIterator(),
finishedSplits);
}
- private ChangeEventRecords forRecords(Iterator<SourceRecords> dataIt) {
- if (currentFetcher instanceof IncrementalSourceScanFetcher) {
- final ChangeEventRecords finishedRecords =
- ChangeEventRecords.forSnapshotRecords(currentSplitId,
dataIt);
- closeScanFetcher();
- return finishedRecords;
- } else {
- return ChangeEventRecords.forRecords(currentSplitId, dataIt);
- }
+ private ChangeEventRecords forUnfinishedRecords(Iterator<SourceRecords>
dataIt) {
+ return ChangeEventRecords.forRecords(currentSplitId, dataIt);
}
private void submitSnapshotSplit(SnapshotSplit snapshotSplit) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
index d23d4d36c..3136bb97d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -115,64 +116,87 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
checkReadException();
if (hasNextElement.get()) {
- // eg:
- // data input: [low watermark event][snapshot events][high
watermark event][change
- // events][end watermark event]
- // data output: [low watermark event][normalized events][high
watermark event]
- boolean reachChangeLogStart = false;
- boolean reachChangeLogEnd = false;
- SourceRecord lowWatermark = null;
- SourceRecord highWatermark = null;
- Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
- while (!reachChangeLogEnd) {
- checkReadException();
- List<DataChangeEvent> batch = queue.poll();
- for (DataChangeEvent event : batch) {
- SourceRecord record = event.getRecord();
- if (lowWatermark == null) {
- lowWatermark = record;
- assertLowWatermark(lowWatermark);
- continue;
- }
+ if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+ return pollWithoutBuffer();
+ } else {
+ return pollWithBuffer();
+ }
+ }
+ // the data has been polled, no more data
+ reachEnd.compareAndSet(false, true);
+ return null;
+ }
- if (highWatermark == null && isHighWatermarkEvent(record))
{
- highWatermark = record;
- // snapshot events capture end and begin to capture
stream events
- reachChangeLogStart = true;
- continue;
- }
+ public Iterator<SourceRecords> pollWithoutBuffer() throws
InterruptedException {
+ checkReadException();
+ List<DataChangeEvent> batch = queue.poll();
+ final List<SourceRecord> records = new ArrayList<>();
+ for (DataChangeEvent event : batch) {
+ if (isEndWatermarkEvent(event.getRecord())) {
+ hasNextElement.set(false);
+ break;
+ }
+ records.add(event.getRecord());
+ }
- if (reachChangeLogStart && isEndWatermarkEvent(record)) {
- // capture to end watermark events, stop the loop
- reachChangeLogEnd = true;
- break;
- }
+ return Collections.singletonList(new
SourceRecords(records)).iterator();
+ }
+
+ public Iterator<SourceRecords> pollWithBuffer() throws
InterruptedException {
+ // eg:
+ // data input: [low watermark event][snapshot events][high watermark
event][change
+ // events][end watermark event]
+ // data output: [low watermark event][normalized events][high
watermark event]
+ boolean reachChangeLogStart = false;
+ boolean reachChangeLogEnd = false;
+ SourceRecord lowWatermark = null;
+ SourceRecord highWatermark = null;
+ Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
+ while (!reachChangeLogEnd) {
+ checkReadException();
+ List<DataChangeEvent> batch = queue.poll();
+ for (DataChangeEvent event : batch) {
+ SourceRecord record = event.getRecord();
+ if (lowWatermark == null) {
+ lowWatermark = record;
+ assertLowWatermark(lowWatermark);
+ continue;
+ }
+
+ if (highWatermark == null && isHighWatermarkEvent(record)) {
+ highWatermark = record;
+ // snapshot events capture end and begin to capture stream
events
+ reachChangeLogStart = true;
+ continue;
+ }
+
+ if (reachChangeLogStart && isEndWatermarkEvent(record)) {
+ // capture to end watermark events, stop the loop
+ reachChangeLogEnd = true;
+ break;
+ }
- if (!reachChangeLogStart) {
- outputBuffer.put((Struct) record.key(), record);
- } else {
- if (isChangeRecordInChunkRange(record)) {
- // rewrite overlapping snapshot records through
the record key
- taskContext.rewriteOutputBuffer(outputBuffer,
record);
- }
+ if (!reachChangeLogStart) {
+ outputBuffer.put((Struct) record.key(), record);
+ } else {
+ if (isChangeRecordInChunkRange(record)) {
+ // rewrite overlapping snapshot records through the
record key
+ taskContext.rewriteOutputBuffer(outputBuffer, record);
}
}
}
- // snapshot split return its data once
- hasNextElement.set(false);
+ }
+ // snapshot split return its data once
+ hasNextElement.set(false);
- final List<SourceRecord> normalizedRecords = new ArrayList<>();
- normalizedRecords.add(lowWatermark);
-
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
- normalizedRecords.add(highWatermark);
+ final List<SourceRecord> normalizedRecords = new ArrayList<>();
+ normalizedRecords.add(lowWatermark);
+
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
+ normalizedRecords.add(highWatermark);
- final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
- sourceRecordsSet.add(new SourceRecords(normalizedRecords));
- return sourceRecordsSet.iterator();
- }
- // the data has been polled, no more data
- reachEnd.compareAndSet(false, true);
- return null;
+ final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+ sourceRecordsSet.add(new SourceRecords(normalizedRecords));
+ return sourceRecordsSet.iterator();
}
private void checkReadException() {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
index 257f7ce1f..659c68604 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
@@ -517,6 +517,15 @@ class MongoDBFullChangelogITCase extends
MongoDBSourceTestBase {
mongoCollection.updateOne(
Filters.eq("cid", 2000L), Updates.set("address",
"Pittsburgh"));
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
+
+ // Rarely happens, but if there's no operation or
heartbeat events between
+ // watermark #a (the ChangeStream opLog caused by the last
event in this hook)
+ // and watermark #b (the calculated high watermark that
limits the bounded
+ // back-filling stream fetch task), the last event of hook
will be missed since
+ // back-filling task reads between [loW, hiW) (high
watermark not included).
+ // Workaround: insert a dummy event in another collection
to forcefully push
+ // opLog forward.
+ database.getCollection("customers_1").insertOne(new
Document());
};
switch (hookType) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
index 606bc9911..81f3d2c84 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
@@ -786,6 +786,8 @@ class NewlyAddedTableITCase extends MongoDBSourceTestBase {
waitForUpsertSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList,
TestValuesTableFactory.getResultsAsStrings("sink"));
+ // Wait 1s until snapshot phase finished, make sure the binlog
data is not lost.
+ Thread.sleep(1000L);
// step 3: make some changelog data for this round
makeFirstPartOplogForAddressCollection(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
index b86e6e527..b7b23a18e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
@@ -282,79 +282,103 @@ public class SnapshotSplitReader implements
DebeziumReader<SourceRecords, MySqlS
checkReadException();
if (hasNextElement.get()) {
- // data input: [low watermark event][snapshot events][high
watermark event][binlog
- // events][binlog-end event]
- // data output: [low watermark event][normalized events][high
watermark event]
- boolean reachBinlogStart = false;
- boolean reachBinlogEnd = false;
- SourceRecord lowWatermark = null;
- SourceRecord highWatermark = null;
-
- Map<Struct, List<SourceRecord>> snapshotRecords = new HashMap<>();
- while (!reachBinlogEnd) {
- checkReadException();
- List<DataChangeEvent> batch = queue.poll();
- for (DataChangeEvent event : batch) {
- SourceRecord record = event.getRecord();
- if (lowWatermark == null) {
- lowWatermark = record;
- assertLowWatermark(lowWatermark);
- continue;
- }
+ if
(statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+ return pollWithoutBuffer();
+ } else {
+ return pollWithBuffer();
+ }
+ }
- if (highWatermark == null &&
RecordUtils.isHighWatermarkEvent(record)) {
- highWatermark = record;
- // snapshot events capture end and begin to capture
binlog events
- reachBinlogStart = true;
- continue;
- }
+ // the data has been polled, no more data
+ reachEnd.compareAndSet(false, true);
+ return null;
+ }
- if (reachBinlogStart &&
RecordUtils.isEndWatermarkEvent(record)) {
- // capture to end watermark events, stop the loop
- reachBinlogEnd = true;
- break;
- }
+ public Iterator<SourceRecords> pollWithoutBuffer() throws
InterruptedException {
+ checkReadException();
+ List<DataChangeEvent> batch = queue.poll();
+ final List<SourceRecord> records = new ArrayList<>();
+ for (DataChangeEvent event : batch) {
+ if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
+ hasNextElement.set(false);
+ break;
+ }
+ records.add(event.getRecord());
+ }
- if (!reachBinlogStart) {
- if (record.key() != null) {
- snapshotRecords.put(
- (Struct) record.key(),
Collections.singletonList(record));
- } else {
- List<SourceRecord> records =
- snapshotRecords.computeIfAbsent(
- (Struct) record.value(), key ->
new LinkedList<>());
- records.add(record);
- }
+ return Collections.singletonList(new
SourceRecords(records)).iterator();
+ }
+
+ public Iterator<SourceRecords> pollWithBuffer() throws
InterruptedException {
+ // data input: [low watermark event][snapshot events][high watermark
event][binlog
+ // events][binlog-end event]
+ // data output: [low watermark event][normalized events][high
watermark event]
+ boolean reachBinlogStart = false;
+ boolean reachBinlogEnd = false;
+ SourceRecord lowWatermark = null;
+ SourceRecord highWatermark = null;
+
+ Map<Struct, List<SourceRecord>> snapshotRecords = new HashMap<>();
+ while (!reachBinlogEnd) {
+ checkReadException();
+ List<DataChangeEvent> batch = queue.poll();
+ for (DataChangeEvent event : batch) {
+ SourceRecord record = event.getRecord();
+ if (lowWatermark == null) {
+ lowWatermark = record;
+ assertLowWatermark(lowWatermark);
+ continue;
+ }
+
+ if (highWatermark == null &&
RecordUtils.isHighWatermarkEvent(record)) {
+ highWatermark = record;
+ // snapshot events capture end and begin to capture binlog
events
+ reachBinlogStart = true;
+ continue;
+ }
+
+ if (reachBinlogStart &&
RecordUtils.isEndWatermarkEvent(record)) {
+ // capture to end watermark events, stop the loop
+ reachBinlogEnd = true;
+ break;
+ }
+
+ if (!reachBinlogStart) {
+ if (record.key() != null) {
+ snapshotRecords.put(
+ (Struct) record.key(),
Collections.singletonList(record));
} else {
- RecordUtils.upsertBinlog(
- snapshotRecords,
- record,
- currentSnapshotSplit.getSplitKeyType(),
- nameAdjuster,
- currentSnapshotSplit.getSplitStart(),
- currentSnapshotSplit.getSplitEnd());
+ List<SourceRecord> records =
+ snapshotRecords.computeIfAbsent(
+ (Struct) record.value(), key -> new
LinkedList<>());
+ records.add(record);
}
+ } else {
+ RecordUtils.upsertBinlog(
+ snapshotRecords,
+ record,
+ currentSnapshotSplit.getSplitKeyType(),
+ nameAdjuster,
+ currentSnapshotSplit.getSplitStart(),
+ currentSnapshotSplit.getSplitEnd());
}
}
- // snapshot split return its data once
- hasNextElement.set(false);
-
- final List<SourceRecord> normalizedRecords = new ArrayList<>();
- normalizedRecords.add(lowWatermark);
- normalizedRecords.addAll(
- RecordUtils.formatMessageTimestamp(
- snapshotRecords.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList())));
- normalizedRecords.add(highWatermark);
-
- final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
- sourceRecordsSet.add(new SourceRecords(normalizedRecords));
- return sourceRecordsSet.iterator();
}
- // the data has been polled, no more data
- reachEnd.compareAndSet(false, true);
- return null;
+ // snapshot split return its data once
+ hasNextElement.set(false);
+
+ final List<SourceRecord> normalizedRecords = new ArrayList<>();
+ normalizedRecords.add(lowWatermark);
+ normalizedRecords.addAll(
+ RecordUtils.formatMessageTimestamp(
+ snapshotRecords.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList())));
+ normalizedRecords.add(highWatermark);
+
+ final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
+ sourceRecordsSet.add(new SourceRecords(normalizedRecords));
+ return sourceRecordsSet.iterator();
}
private void checkReadException() {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
index e1178982b..61648756d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@@ -102,7 +103,7 @@ public class MySqlSplitReader implements
SplitReader<SourceRecords, MySqlSplit>
Iterator<SourceRecords> dataIt;
if (currentReader == null) {
// (1) Reads binlog split firstly and then read snapshot split
- if (binlogSplits.size() > 0) {
+ if (!binlogSplits.isEmpty()) {
// the binlog split may come from:
// (a) the initial binlog split
// (b) added back binlog-split in newly added table process
@@ -110,7 +111,7 @@ public class MySqlSplitReader implements
SplitReader<SourceRecords, MySqlSplit>
currentSplitId = nextSplit.splitId();
currentReader = getBinlogSplitReader();
currentReader.submitSplit(nextSplit);
- } else if (snapshotSplits.size() > 0) {
+ } else if (!snapshotSplits.isEmpty()) {
MySqlSplit nextSplit = snapshotSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getSnapshotSplitReader();
@@ -119,19 +120,21 @@ public class MySqlSplitReader implements
SplitReader<SourceRecords, MySqlSplit>
LOG.info("No available split to read.");
}
dataIt = currentReader.pollSplitRecords();
- return dataIt == null ? finishedSplit() : forRecords(dataIt);
+ return dataIt == null ? finishedSplit(true) :
forUnfinishedRecords(dataIt);
} else if (currentReader instanceof SnapshotSplitReader) {
- // (2) try to switch to binlog split reading util current snapshot
split finished
dataIt = currentReader.pollSplitRecords();
if (dataIt != null) {
// first fetch data of snapshot split, return and emit the
records of snapshot split
- MySqlRecords records;
+ return forUnfinishedRecords(dataIt);
+ } else {
+ // (2) try to switch to binlog split reading util current
snapshot split finished
+ MySqlRecords finishedRecords;
if (context.isHasAssignedBinlogSplit()) {
- records = forNewAddedTableFinishedSplit(currentSplitId,
dataIt);
+ finishedRecords =
forNewAddedTableFinishedSplit(currentSplitId);
closeSnapshotReader();
closeBinlogReader();
} else {
- records = forRecords(dataIt);
+ finishedRecords = finishedSplit(false);
MySqlSplit nextSplit = snapshotSplits.poll();
if (nextSplit != null) {
currentSplitId = nextSplit.splitId();
@@ -140,9 +143,7 @@ public class MySqlSplitReader implements
SplitReader<SourceRecords, MySqlSplit>
closeSnapshotReader();
}
}
- return records;
- } else {
- return finishedSplit();
+ return finishedRecords;
}
} else if (currentReader instanceof BinlogSplitReader) {
// (3) switch to snapshot split reading if there are newly added
snapshot splits
@@ -157,46 +158,41 @@ public class MySqlSplitReader implements
SplitReader<SourceRecords, MySqlSplit>
currentReader = getSnapshotSplitReader();
currentReader.submitSplit(nextSplit);
}
- return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
+ return MySqlRecords.forUnfinishedRecords(BINLOG_SPLIT_ID,
dataIt);
} else {
// null will be returned after receiving suspend binlog event
// finish current binlog split reading
closeBinlogReader();
- return finishedSplit();
+ return finishedSplit(true);
}
} else {
throw new IllegalStateException("Unsupported reader type.");
}
}
- private MySqlRecords finishedSplit() {
+ private MySqlRecords finishedSplit(boolean recycleScanFetcher) {
final MySqlRecords finishedRecords =
MySqlRecords.forFinishedSplit(currentSplitId);
+ if (recycleScanFetcher) {
+ closeSnapshotReader();
+ }
currentSplitId = null;
return finishedRecords;
}
- private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {
- if (currentReader instanceof SnapshotSplitReader) {
- final MySqlRecords finishedRecords =
- MySqlRecords.forSnapshotRecords(currentSplitId, dataIt);
- closeSnapshotReader();
- return finishedRecords;
- } else {
- return MySqlRecords.forBinlogRecords(currentSplitId, dataIt);
- }
+ private MySqlRecords forUnfinishedRecords(Iterator<SourceRecords> dataIt) {
+ return MySqlRecords.forUnfinishedRecords(currentSplitId, dataIt);
}
/**
* Finishes new added snapshot split, mark the binlog split as finished
too, we will add the
* binlog split back in {@code MySqlSourceReader}.
*/
- private MySqlRecords forNewAddedTableFinishedSplit(
- final String splitId, final Iterator<SourceRecords>
recordsForSplit) {
+ private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) {
final Set<String> finishedSplits = new HashSet<>();
finishedSplits.add(splitId);
finishedSplits.add(BINLOG_SPLIT_ID);
currentSplitId = null;
- return new MySqlRecords(splitId, recordsForSplit, finishedSplits);
+ return new MySqlRecords(splitId, Collections.emptyIterator(),
finishedSplits);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
index 9d6192aa5..4b6145237 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java
@@ -76,16 +76,11 @@ public final class MySqlRecords implements
RecordsWithSplitIds<SourceRecords> {
return finishedSnapshotSplits;
}
- public static MySqlRecords forBinlogRecords(
+ public static MySqlRecords forUnfinishedRecords(
final String splitId, final Iterator<SourceRecords>
recordsForSplit) {
return new MySqlRecords(splitId, recordsForSplit,
Collections.emptySet());
}
- public static MySqlRecords forSnapshotRecords(
- final String splitId, final Iterator<SourceRecords>
recordsForSplit) {
- return new MySqlRecords(splitId, recordsForSplit,
Collections.singleton(splitId));
- }
-
public static MySqlRecords forFinishedSplit(final String splitId) {
return new MySqlRecords(null, null, Collections.singleton(splitId));
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
index 274ffaacd..d53307d7c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
@@ -168,7 +168,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
public static Stream<Arguments> parameters() {
return Stream.of(
- Arguments.of("customers", null, null),
+ Arguments.of("customers", null, "false"),
Arguments.of("customers", "id", "true"),
Arguments.of("customers_no_pk", "id", "true"));
}
@@ -184,13 +184,18 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
void testReadSingleTableWithSingleParallelismAndSkipBackFill(
String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
+ HashMap<String, String> option = new HashMap<>();
+ option.put("scan.incremental.snapshot.backfill.skip", "true");
+ option.put("scan.incremental.snapshot.chunk.size", "1");
testMySqlParallelSource(
1,
DEFAULT_SCAN_STARTUP_MODE,
@@ -198,10 +203,11 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
FailoverPhase.NEVER,
new String[] {tableName},
RestartStrategies.fixedDelayRestart(1, 0),
- true,
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -215,7 +221,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -229,7 +237,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName, "customers_1"},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -243,7 +253,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName, "customers_1"},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
// Failover tests
@@ -257,7 +269,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName, "customers_1"},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -270,7 +284,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName, "customers_1"},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -286,7 +302,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
RestartStrategies.fixedDelayRestart(1, 0),
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -299,7 +317,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName, "customers_1"},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -312,7 +332,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName, "customers_1"},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -328,7 +350,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
RestartStrategies.fixedDelayRestart(1, 0),
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -342,7 +366,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@@ -356,7 +382,26 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
new String[] {tableName},
tableName,
chunkColumnName,
- assignEndingFirst);
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @Test
+ public void testReadSingleTableMutilpleFetch() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put("debezium.snapshot.fetch.size", "2");
+ options.put("debezium.max.batch.size", "3");
+ testMySqlParallelSource(
+ 1,
+ DEFAULT_SCAN_STARTUP_MODE,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ new String[] {"customers"},
+ RestartStrategies.fixedDelayRestart(1, 0),
+ "customers",
+ "id",
+ options);
}
@ParameterizedTest
@@ -452,10 +497,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
StartupOptions.latest(), Collections.emptyList(), tableName,
chunkColumnName);
}
- @ParameterizedTest
- @MethodSource("parameters")
- void testSnapshotOnlyModeWithDMLPostHighWaterMark(String tableName, String
chunkColumnName)
- throws Exception {
+ @Test
+ void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_POST_HIGHWATERMARK_HOOK,
StartupOptions.snapshot());
@@ -485,10 +528,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
- @ParameterizedTest
- @MethodSource("parameters")
- void testSnapshotOnlyModeWithDMLPreHighWaterMark(String tableName, String
chunkColumnName)
- throws Exception {
+ @Test
+ void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_PRE_HIGHWATERMARK_HOOK,
StartupOptions.snapshot());
@@ -520,10 +561,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
- @ParameterizedTest
- @MethodSource("parameters")
- void testEnableBackfillWithDMLPreHighWaterMark(String tableName, String
chunkColumnName)
- throws Exception {
+ @Test
+ void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
@@ -557,10 +596,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
- @ParameterizedTest
- @MethodSource("parameters")
- void testEnableBackfillWithDMLPostLowWaterMark(String tableName, String
chunkColumnName)
- throws Exception {
+ @Test
+ void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
@@ -594,10 +631,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
- @ParameterizedTest
- @MethodSource("parameters")
- void testSkipBackfillWithDMLPreHighWaterMark(String tableName, String
chunkColumnName)
- throws Exception {
+ @Test
+ void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
@@ -635,10 +670,8 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
- @ParameterizedTest
- @MethodSource("parameters")
- void testSkipBackfillWithDMLPostLowWaterMark(String tableName, String
chunkColumnName)
- throws Exception {
+ @Test
+ void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
@@ -677,106 +710,6 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
- @SuppressWarnings("unchecked")
- @ParameterizedTest
- @MethodSource("parameters")
- void testSourceMetrics(String tableName, String chunkColumnName) throws
Exception {
- customDatabase.createAndInitialize();
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- MySqlSource<String> source =
- MySqlSource.<String>builder()
- .hostname(MYSQL_CONTAINER.getHost())
- .port(MYSQL_CONTAINER.getDatabasePort())
- .databaseList(customDatabase.getDatabaseName())
- .tableList(customDatabase.getDatabaseName() +
".customers")
- .username(customDatabase.getUsername())
- .password(customDatabase.getPassword())
- .deserializer(new
StringDebeziumDeserializationSchema())
- .serverId(getServerId())
- .serverTimeZone("UTC")
- .build();
- DataStreamSource<String> stream =
- env.fromSource(source, WatermarkStrategy.noWatermarks(),
"MySQL CDC Source");
- CollectResultIterator<String> iterator = addCollector(env, stream);
- JobClient jobClient = env.executeAsync();
- iterator.setJobClient(jobClient);
-
- // ---------------------------- Snapshot phase
------------------------------
- // Wait until we receive all 21 snapshot records
- int numSnapshotRecordsExpected = 21;
- int numSnapshotRecordsReceived = 0;
- while (numSnapshotRecordsReceived < numSnapshotRecordsExpected &&
iterator.hasNext()) {
- iterator.next();
- numSnapshotRecordsReceived++;
- }
-
- // Check metrics
- List<OperatorMetricGroup> metricGroups =
- metricReporter.findOperatorMetricGroups(jobClient.getJobID(),
"MySQL CDC Source");
- // There should be only 1 parallelism of source, so it's safe to get
the only group
- OperatorMetricGroup group = metricGroups.get(0);
- Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);
-
- // numRecordsOut
-
Assertions.assertThat(group.getIOMetricGroup().getNumRecordsOutCounter().getCount())
- .isEqualTo(numSnapshotRecordsExpected);
-
- // currentEmitEventTimeLag should be UNDEFINED during snapshot phase
-
Assertions.assertThat(metrics).containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
- Gauge<Long> currentEmitEventTimeLag =
- (Gauge<Long>)
metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
- Assertions.assertThat((long) currentEmitEventTimeLag.getValue())
- .isEqualTo(InternalSourceReaderMetricGroup.UNDEFINED);
-
- // currentFetchEventTimeLag should be UNDEFINED during snapshot phase
-
Assertions.assertThat(metrics).containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
- Gauge<Long> currentFetchEventTimeLag =
- (Gauge<Long>)
metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
- Assertions.assertThat((long) currentFetchEventTimeLag.getValue())
- .isEqualTo(MySqlSourceReaderMetrics.UNDEFINED);
-
- // sourceIdleTime should be positive (we can't know the exact value)
-
Assertions.assertThat(metrics).containsKey(MetricNames.SOURCE_IDLE_TIME);
- Gauge<Long> sourceIdleTime = (Gauge<Long>)
metrics.get(MetricNames.SOURCE_IDLE_TIME);
- Assertions.assertThat(sourceIdleTime.getValue())
- .isPositive()
- .isLessThan(TIMEOUT.toMillis());
-
- // --------------------------------- Binlog phase
-----------------------------
- makeFirstPartBinlogEvents(getConnection(),
customDatabase.qualifiedTableName("customers"));
- // Wait until we receive 4 changes made above
- int numBinlogRecordsExpected = 4;
- int numBinlogRecordsReceived = 0;
- while (numBinlogRecordsReceived < numBinlogRecordsExpected &&
iterator.hasNext()) {
- iterator.next();
- numBinlogRecordsReceived++;
- }
-
- // Check metrics
- // numRecordsOut
-
Assertions.assertThat(group.getIOMetricGroup().getNumRecordsOutCounter().getCount())
- .isEqualTo(numSnapshotRecordsExpected +
numBinlogRecordsExpected);
-
- // currentEmitEventTimeLag should be reasonably positive (we can't
know the exact value)
- Assertions.assertThat(currentEmitEventTimeLag.getValue())
- .isPositive()
- .isLessThan(TIMEOUT.toMillis());
-
- // currentEmitEventTimeLag should be reasonably positive (we can't
know the exact value)
- Assertions.assertThat(currentFetchEventTimeLag.getValue())
- .isPositive()
- .isLessThan(TIMEOUT.toMillis());
-
- // currentEmitEventTimeLag should be reasonably positive (we can't
know the exact value)
- Assertions.assertThat(sourceIdleTime.getValue())
- .isPositive()
- .isLessThan(TIMEOUT.toMillis());
-
- jobClient.cancel().get();
- iterator.close();
- }
-
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill,
int fetchSize,
@@ -1077,7 +1010,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
String[] captureCustomerTables,
String tableName,
String chunkColumnName,
- String assignEndingFirst)
+ Map<String, String> options)
throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
@@ -1086,7 +1019,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
captureCustomerTables,
tableName,
chunkColumnName,
- assignEndingFirst);
+ options);
}
private void testMySqlParallelSource(
@@ -1096,7 +1029,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
String[] captureCustomerTables,
String tableName,
String chunkColumnName,
- String assignEndingFirst)
+ Map<String, String> options)
throws Exception {
testMySqlParallelSource(
parallelism,
@@ -1107,31 +1040,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
RestartStrategies.fixedDelayRestart(1, 0),
tableName,
chunkColumnName,
- assignEndingFirst);
- }
-
- private void testMySqlParallelSource(
- int parallelism,
- String scanStartupMode,
- FailoverType failoverType,
- FailoverPhase failoverPhase,
- String[] captureCustomerTables,
- RestartStrategies.RestartStrategyConfiguration
restartStrategyConfiguration,
- String tableName,
- String chunkColumnName,
- String assignEndingFirst)
- throws Exception {
- testMySqlParallelSource(
- parallelism,
- scanStartupMode,
- failoverType,
- failoverPhase,
- captureCustomerTables,
- restartStrategyConfiguration,
- false,
- tableName,
- chunkColumnName,
- assignEndingFirst);
+ options);
}
private void testMySqlParallelSource(
@@ -1141,10 +1050,9 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
FailoverPhase failoverPhase,
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration
restartStrategyConfiguration,
- boolean skipSnapshotBackfill,
String tableName,
String chunkColumnName,
- String assignEndingFirst)
+ Map<String, String> otherOptions)
throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1174,9 +1082,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' =
'100',"
- + " 'scan.incremental.snapshot.backfill.skip'
= '%s',"
+ " 'server-time-zone' = 'UTC',"
- + "
'scan.incremental.snapshot.unbounded-chunk-first.enabled' = '%s',"
+ " 'server-id' = '%s'"
+ " %s"
+ ")",
@@ -1187,14 +1093,22 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
customDatabase.getDatabaseName(),
getTableNameRegex(captureCustomerTables),
scanStartupMode,
- skipSnapshotBackfill,
- assignEndingFirst == null ? "false" : "true",
getServerId(),
chunkColumnName == null
? ""
: String.format(
",
'scan.incremental.snapshot.chunk.key-column' = '%s'",
- chunkColumnName));
+ chunkColumnName),
+ otherOptions.isEmpty()
+ ? ""
+ : ","
+ + otherOptions.entrySet().stream()
+ .map(
+ e ->
+ String.format(
+
"'%s'='%s'",
+
e.getKey(), e.getValue()))
+
.collect(Collectors.joining(",")));
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
index 9495234bb..692bd866f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
@@ -831,6 +831,8 @@ class NewlyAddedTableITCase extends MySqlSourceTestBase {
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(
fetchedDataList,
TestValuesTableFactory.getResultsAsStrings("sink"));
+ // Wait 1s until snapshot phase finished, make sure the binlog
data is not lost.
+ Thread.sleep(1000L);
// step 3: make some binlog data for this round
makeFirstPartBinlogForAddressTable(getConnection(),
newlyAddedTable);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index ae3d78c5d..3f0cd2a2e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -69,6 +69,8 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,6 +103,7 @@ import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isH
import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
import static
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
+import static org.apache.flink.core.io.InputStatus.END_OF_INPUT;
import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
import static org.apache.flink.util.Preconditions.checkState;
@@ -184,8 +187,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
"+I[2000, user_21, Shanghai, 123567891234]"
};
// Step 2: wait the snapshot splits finished reading
- Thread.sleep(5000L);
- List<String> actualRecords = consumeRecords(reader, dataType);
+ List<String> actualRecords = consumeSnapshotRecords(reader, dataType);
assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
reader.handleSourceEvents(
new FinishedSnapshotSplitsAckEvent(
@@ -213,10 +215,12 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
restartReader.close();
}
- @Test
- void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testFinishedUnackedSplitsUsingStateFromSnapshotPhase(boolean
skipBackFill)
+ throws Exception {
customerDatabase.createAndInitialize();
- final MySqlSourceConfig sourceConfig = getConfig(new String[]
{"customers"});
+ final MySqlSourceConfig sourceConfig = getConfig(new String[]
{"customers"}, skipBackFill);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
@@ -294,8 +298,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
"+I[2000, user_21, Shanghai, 123567891234]"
};
// Step 2: wait the snapshot splits finished reading
- Thread.sleep(5000L);
- List<String> actualRecords = consumeRecords(reader, dataType);
+ List<String> actualRecords = consumeSnapshotRecords(reader, dataType);
assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
// Step 3: snapshot reader's state
@@ -352,7 +355,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
"+U[103, user_3, Hangzhou, 123567891234]"
};
// the 2 records are produced by 1 operations
- List<String> actualRecords = consumeRecords(reader, dataType);
+ List<String> actualRecords = consumeBinlogRecords(reader, dataType);
assertEqualsInOrder(Arrays.asList(expectedRecords), actualRecords);
List<MySqlSplit> splitsState = reader.snapshotState(1L);
// check the binlog split state
@@ -373,7 +376,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
"+U[103, user_3, Shanghai, 123567891234]"
};
// the 4 records are produced by 3 operations
- List<String> restRecords = consumeRecords(restartReader, dataType);
+ List<String> restRecords = consumeBinlogRecords(restartReader,
dataType);
assertEqualsInOrder(Arrays.asList(expectedRestRecords), restRecords);
restartReader.close();
}
@@ -610,6 +613,10 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
}
private MySqlSourceConfig getConfig(String[] captureTables) {
+ return getConfig(captureTables, false);
+ }
+
+ private MySqlSourceConfig getConfig(String[] captureTables, boolean
skipBackFill) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() +
"." + tableName)
@@ -627,21 +634,34 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
+ .skipSnapshotBackfill(skipBackFill)
.createConfig(0);
}
- private List<String> consumeRecords(
+ private List<String> consumeSnapshotRecords(
MySqlSourceReader<SourceRecord> sourceReader, DataType recordType)
throws Exception {
- // Poll all the n records of the single split.
+ // Poll all the records of the multiple assigned snapshot split.
+ sourceReader.notifyNoMoreSplits();
final SimpleReaderOutput output = new SimpleReaderOutput();
InputStatus status = MORE_AVAILABLE;
- while (MORE_AVAILABLE == status || output.getResults().size() == 0) {
+ while (END_OF_INPUT != status) {
status = sourceReader.pollNext(output);
}
final RecordsFormatter formatter = new RecordsFormatter(recordType);
return formatter.format(output.getResults());
}
+ private List<String> consumeBinlogRecords(
+ MySqlSourceReader<SourceRecord> sourceReader, DataType recordType)
throws Exception {
+ // Poll one batch records of the binlog split.
+ final SimpleReaderOutput output = new SimpleReaderOutput();
+ while (output.getResults().isEmpty()) {
+ sourceReader.pollNext(output);
+ }
+ final RecordsFormatter formatter = new RecordsFormatter(recordType);
+ return formatter.format(output.getResults());
+ }
+
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index 7ec84611a..7cc5fe586 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -1419,6 +1419,9 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
Deadline.fromNow(Duration.ofSeconds(10)));
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
+ // Wait 1s until snapshot phase finished, cannot update DDL during
snapshot phase.
+ List<String> actualRows = new ArrayList<>(fetchRows(iterator, 2));
+ Thread.sleep(1000L);
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM default_value_test WHERE id=1;");
@@ -1506,7 +1509,8 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
+ " tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 '"
+ " );");
}
- assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator,
expected.length));
+ actualRows.addAll(fetchRows(iterator, expected.length - 2));
+ assertEqualsInAnyOrder(Arrays.asList(expected), actualRows);
jobClient.cancel().get();
}
@@ -1554,6 +1558,10 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
Deadline.fromNow(Duration.ofSeconds(10)));
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
+ // Wait 1s until snapshot phase finished, cannot update DDL during
snapshot phase.
+ List<String> actualRows = new ArrayList<>(fetchRows(iterator, 2));
+ Thread.sleep(1000L);
+
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM default_value_test WHERE id=1;");
@@ -1572,7 +1580,8 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
statement.execute(
"alter table default_value_test add column `int_test` INT
DEFAULT ' 30 ';");
}
- assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator,
expected.length));
+ actualRows.addAll(fetchRows(iterator, expected.length - 2));
+ assertEqualsInAnyOrder(Arrays.asList(expected), actualRows);
jobClient.cancel().get();
}
@@ -2252,7 +2261,7 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
private static void waitForSnapshotStarted(CloseableIterator<Row>
iterator) throws Exception {
while (!iterator.hasNext()) {
- Thread.sleep(100);
+ Thread.sleep(1000);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
index 3cf295501..5dab6cd6f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
@@ -687,6 +687,8 @@ class NewlyAddedTableITCase extends OracleSourceTestBase {
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(
fetchedDataList,
TestValuesTableFactory.getResultsAsStrings("sink"));
+ // Wait 1s until snapshot phase finished, make sure the binlog
data is not lost.
+ Thread.sleep(1000L);
// step 3: make some redo log data for this round
makeFirstPartRedoLogForAddressTable(newlyAddedTable);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
index 548108f94..ee868d0b8 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -309,7 +309,7 @@ public class PostgresScanFetchTask extends
AbstractScanFetchTask {
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
- connectorConfig.getQueryFetchSize());
+ connectorConfig.getSnapshotFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs,
table);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
index f9016cd27..bee31f981 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
@@ -725,6 +725,8 @@ class NewlyAddedTableITCase extends PostgresTestBase {
PostgresTestUtils.waitForUpsertSinkSize("sink",
fetchedDataList.size());
assertEqualsInAnyOrder(
fetchedDataList,
TestValuesTableFactory.getResultsAsStrings("sink"));
+ // Wait 1s until snapshot phase finished, make sure the binlog
data is not lost.
+ Thread.sleep(1000L);
// step 3: make some wal log data for this round
makeFirstPartWalLogForAddressTable(getConnection(),
newlyAddedTable);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 5d212c342..da0ac487b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -377,6 +377,21 @@ class PostgresSourceITCase extends PostgresTestBase {
optionalJobClient.get().cancel().get();
}
+ @Test
+ void testReadSingleTableMutilpleFetch() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put("debezium.snapshot.fetch.size", "2");
+ options.put("debezium.max.batch.size", "3");
+ testPostgresParallelSource(
+ 1,
+ DEFAULT_SCAN_STARTUP_MODE,
+ PostgresTestUtils.FailoverType.NONE,
+ PostgresTestUtils.FailoverPhase.NEVER,
+ new String[] {"Customers"},
+ RestartStrategies.fixedDelayRestart(1, 0),
+ options);
+ }
+
@Test
void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
// The data num is 21, set fetchSize = 22 to test the job is bounded.
@@ -1069,7 +1084,6 @@ class PostgresSourceITCase extends PostgresTestBase {
PostgresTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
-
waitUntilJobRunning(tableResult);
CloseableIterator<Row> iterator = tableResult.collect();
Optional<JobClient> optionalJobClient = tableResult.getJobClient();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index 509064c5f..ee3dcdcb0 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -35,6 +35,7 @@ import
org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
import org.apache.flink.cdc.connectors.postgres.testutils.TestTableId;
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
@@ -44,14 +45,19 @@ import org.apache.flink.table.types.DataType;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import org.apache.kafka.connect.source.SourceRecord;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link PostgresScanFetchTask}. */
class PostgresScanFetchTaskTest extends PostgresTestBase {
@@ -237,6 +243,45 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
+ @Test
+ void testSnapshotFetchSize() throws Exception {
+ customDatabase.createAndInitialize();
+ PostgresSourceConfigFactory sourceConfigFactory =
+ getMockPostgresSourceConfigFactory(customDatabase, schemaName,
tableName, 10, true);
+ Properties properties = new Properties();
+ properties.setProperty("snapshot.fetch.size", "2");
+ sourceConfigFactory.debeziumProperties(properties);
+ PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
+ PostgresDialect postgresDialect = new
PostgresDialect(sourceConfigFactory.create(0));
+ SnapshotSplit snapshotSplit = getSnapshotSplits(sourceConfig,
postgresDialect).get(0);
+ PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
+ new PostgresSourceFetchTaskContext(sourceConfig,
postgresDialect);
+ final String selectSql =
+ PostgresQueryUtils.buildSplitScanQuery(
+ snapshotSplit.getTableId(),
+ snapshotSplit.getSplitKeyType(),
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null,
+ Collections.emptyList());
+
+ try (PostgresConnection jdbcConnection =
postgresDialect.openJdbcConnection();
+ PreparedStatement selectStatement =
+ PostgresQueryUtils.readTableSplitDataStatement(
+ jdbcConnection,
+ selectSql,
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null,
+ snapshotSplit.getSplitStart(),
+ snapshotSplit.getSplitEnd(),
+
snapshotSplit.getSplitKeyType().getFieldCount(),
+ postgresSourceFetchTaskContext
+ .getDbzConnectorConfig()
+ .getSnapshotFetchSize());
+ ResultSet rs = selectStatement.executeQuery()) {
+ assertThat(rs.getFetchSize()).isEqualTo(2);
+ }
+ }
+
private List<String> getDataInSnapshotScan(
String[] changingDataSql,
String schemaName,
@@ -314,8 +359,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
sourceScanFetcher.close();
-
Assertions.assertThat(sourceScanFetcher.getExecutorService()).isNotNull();
-
Assertions.assertThat(sourceScanFetcher.getExecutorService().isTerminated()).isTrue();
+ assertThat(sourceScanFetcher.getExecutorService()).isNotNull();
+
assertThat(sourceScanFetcher.getExecutorService().isTerminated()).isTrue();
return formatResult(result, dataType);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 905d47131..68ef4690e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -17,27 +17,69 @@
package org.apache.flink.cdc.connectors.postgres.source.reader;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
import org.apache.flink.cdc.connectors.postgres.source.MockPostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
-import org.apache.flink.cdc.connectors.postgres.testutils.TestTable;
+import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
+import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import static org.apache.flink.table.api.DataTypes.BIGINT;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.core.io.InputStatus.END_OF_INPUT;
+import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link PostgresSourceReader}. */
-class PostgresSourceReaderTest {
+public class PostgresSourceReaderTest extends PostgresTestBase {
+ private static final String DB_NAME_PREFIX = "postgres";
+ private static final String SCHEMA_NAME = "customer";
+ private String slotName;
+ private final UniqueDatabase customDatabase =
+ new UniqueDatabase(
+ POSTGRES_CONTAINER,
+ DB_NAME_PREFIX,
+ SCHEMA_NAME,
+ POSTGRES_CONTAINER.getUsername(),
+ POSTGRES_CONTAINER.getPassword());
+
+ @BeforeEach
+ public void before() throws SQLException {
+ customDatabase.createAndInitialize();
+
+ this.slotName = getSlotName();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ customDatabase.removeSlot(slotName);
+ }
@Test
void testNotifyCheckpointWindowSizeOne() throws Exception {
@@ -69,23 +111,173 @@ class PostgresSourceReaderTest {
assertThat(completedCheckpointIds).containsExactly(101L);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testMultipleSnapshotSplit(boolean skipBackFill) throws Exception {
+ final DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("Id", DataTypes.BIGINT()),
+ DataTypes.FIELD("Name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+ TableId tableId = new TableId(null, SCHEMA_NAME, "Customers");
+ RowType splitType =
+ RowType.of(
+ new LogicalType[] {DataTypes.INT().getLogicalType()},
new String[] {"Id"});
+ List<SnapshotSplit> snapshotSplits =
+ Arrays.asList(
+ new SnapshotSplit(
+ tableId,
+ 0,
+ splitType,
+ null,
+ new Integer[] {200},
+ null,
+ Collections.emptyMap()),
+ new SnapshotSplit(
+ tableId,
+ 1,
+ splitType,
+ new Integer[] {200},
+ new Integer[] {1500},
+ null,
+ Collections.emptyMap()),
+ new SnapshotSplit(
+ tableId,
+ 2,
+ splitType,
+ new Integer[] {1500},
+ null,
+ null,
+ Collections.emptyMap()));
+
+ // Step 1: start source reader and assign snapshot splits
+ PostgresSourceReader reader = createReader(-1, skipBackFill);
+ reader.start();
+ reader.addSplits(snapshotSplits);
+
+ String[] expectedRecords =
+ new String[] {
+ "+I[111, user_6, Shanghai, 123567891234]",
+ "+I[110, user_5, Shanghai, 123567891234]",
+ "+I[101, user_1, Shanghai, 123567891234]",
+ "+I[103, user_3, Shanghai, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "+I[118, user_7, Shanghai, 123567891234]",
+ "+I[121, user_8, Shanghai, 123567891234]",
+ "+I[123, user_9, Shanghai, 123567891234]",
+ "+I[109, user_4, Shanghai, 123567891234]",
+ "+I[1009, user_10, Shanghai, 123567891234]",
+ "+I[1011, user_12, Shanghai, 123567891234]",
+ "+I[1010, user_11, Shanghai, 123567891234]",
+ "+I[1013, user_14, Shanghai, 123567891234]",
+ "+I[1012, user_13, Shanghai, 123567891234]",
+ "+I[1015, user_16, Shanghai, 123567891234]",
+ "+I[1014, user_15, Shanghai, 123567891234]",
+ "+I[1017, user_18, Shanghai, 123567891234]",
+ "+I[1016, user_17, Shanghai, 123567891234]",
+ "+I[1019, user_20, Shanghai, 123567891234]",
+ "+I[1018, user_19, Shanghai, 123567891234]",
+ "+I[2000, user_21, Shanghai, 123567891234]"
+ };
+ // Step 2: wait the snapshot splits finished reading
+ List<String> actualRecords = consumeSnapshotRecords(reader, dataType);
+ assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
+ }
+
+ private List<String> consumeSnapshotRecords(
+ PostgresSourceReader sourceReader, DataType recordType) throws
Exception {
+ // Poll all the records of the multiple assigned snapshot split.
+ sourceReader.notifyNoMoreSplits();
+ final SimpleReaderOutput output = new SimpleReaderOutput();
+ InputStatus status = MORE_AVAILABLE;
+ while (END_OF_INPUT != status) {
+ status = sourceReader.pollNext(output);
+ }
+ final RecordsFormatter formatter = new RecordsFormatter(recordType);
+ return formatter.format(output.getResults());
+ }
+
private PostgresSourceReader createReader(final int
lsnCommitCheckpointsDelay)
throws Exception {
+ return createReader(lsnCommitCheckpointsDelay, false);
+ }
+
+ private PostgresSourceReader createReader(
+ final int lsnCommitCheckpointsDelay, boolean skipBackFill) throws
Exception {
final PostgresOffsetFactory offsetFactory = new
PostgresOffsetFactory();
final PostgresSourceConfigFactory configFactory = new
PostgresSourceConfigFactory();
- configFactory.hostname("host");
- configFactory.database("pgdb");
- configFactory.username("username");
- configFactory.password("password");
- configFactory.decodingPluginName("pgoutput");
+ configFactory.hostname(customDatabase.getHost());
+ configFactory.port(customDatabase.getDatabasePort());
+ configFactory.database(customDatabase.getDatabaseName());
+ configFactory.tableList(SCHEMA_NAME + ".Customers");
+ configFactory.username(customDatabase.getUsername());
+ configFactory.password(customDatabase.getPassword());
configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
- final TestTable customerTable =
- new TestTable(ResolvedSchema.of(Column.physical("id",
BIGINT())));
- final DebeziumDeserializationSchema<?> deserializer =
customerTable.getDeserializer();
+ configFactory.skipSnapshotBackfill(skipBackFill);
+ configFactory.decodingPluginName("pgoutput");
MockPostgresDialect dialect = new
MockPostgresDialect(configFactory.create(0));
final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
new PostgresSourceBuilder.PostgresIncrementalSource<>(
- configFactory, checkNotNull(deserializer),
offsetFactory, dialect);
+ configFactory, new ForwardDeserializeSchema(),
offsetFactory, dialect);
return source.createReader(new TestingReaderContext());
}
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+ private static class SimpleReaderOutput implements
ReaderOutput<SourceRecord> {
+
+ private final List<SourceRecord> results = new ArrayList<>();
+
+ @Override
+ public void collect(SourceRecord record) {
+ results.add(record);
+ }
+
+ public List<SourceRecord> getResults() {
+ return results;
+ }
+
+ @Override
+ public void collect(SourceRecord record, long timestamp) {
+ collect(record);
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {}
+
+ @Override
+ public void markIdle() {}
+
+ @Override
+ public void markActive() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SourceOutput<SourceRecord>
createOutputForSplit(java.lang.String splitId) {
+ return this;
+ }
+
+ @Override
+ public void releaseOutputForSplit(java.lang.String splitId) {}
+ }
+
+ private static class ForwardDeserializeSchema
+ implements DebeziumDeserializationSchema<SourceRecord> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<SourceRecord>
out) throws Exception {
+ out.collect(record);
+ }
+
+ @Override
+ public TypeInformation<SourceRecord> getProducedType() {
+ return TypeInformation.of(SourceRecord.class);
+ }
+ }
}