lvyanquan commented on code in PR #3812:
URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r1992509136
##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java:
##########
@@ -336,6 +336,92 @@ public void testSyncWholeDatabase() throws Exception {
}
}
+ @Test
+ public void testSyncWholeDatabaseInBatchMode() throws Exception {
+ String databaseName = mysqlInventoryDatabase.getDatabaseName();
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: mysql\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
Review Comment:
Do we need to add `scan.startup.mode: snapshot` in MySQL source config?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -89,7 +90,16 @@ public MySqlPipelineRecordEmitter(
protected void processElement(
SourceRecord element, SourceOutput<Event> output, MySqlSplitState
splitState)
throws Exception {
- if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState())
{
+ if
(StartupOptions.snapshot().equals(sourceConfig.getStartupOptions())) {
+ // In snapshot mode, we simply emit all schemas at once.
+ createTableEventCache.forEach(
+ createTableEvent -> {
+ output.collect(createTableEvent);
+ TableId tableId =
TableId.parse(createTableEvent.tableId().identifier());
+ alreadySendCreateTableTables.add(tableId);
+ });
+ alreadySendCreateTableForBinlogSplit = true;
+ } else if (isLowWatermarkEvent(element) &&
splitState.isSnapshotSplitState()) {
Review Comment:
You may need to rebase to solve the conflict as there is another change in
MySqlPipelineRecordEmitter.java that introduced in
https://github.com/apache/flink-cdc/commit/eee0cc06f7bf842034d0e22bc45f8ffb811f7f78#diff-7da556d0b9e83dcd71a4a2319c7588c813a5c14ac0dfc180fd096ae7b2a4a5b0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]