This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7a57f554e [FLINK-37128][cdc-connector-mysql] Binlog reader skip
shouldEmit filter when snapshot reader skip backfill (#3874)
7a57f554e is described below
commit 7a57f554e84e1d9fcae41aacc63acb82586acad2
Author: Junbo wang <[email protected]>
AuthorDate: Tue May 13 11:57:46 2025 +0800
[FLINK-37128][cdc-connector-mysql] Binlog reader skip shouldEmit filter
when snapshot reader skip backfill (#3874)
---
.../docs/connectors/flink-sources/mongodb-cdc.md | 12 ++
.../docs/connectors/flink-sources/mysql-cdc.md | 11 ++
.../docs/connectors/flink-sources/oracle-cdc.md | 12 ++
.../docs/connectors/flink-sources/postgres-cdc.md | 12 ++
.../docs/connectors/flink-sources/sqlserver-cdc.md | 12 ++
.../docs/connectors/pipeline-connectors/mysql.md | 11 ++
.../docs/connectors/flink-sources/mongodb-cdc.md | 12 ++
.../docs/connectors/flink-sources/mysql-cdc.md | 12 ++
.../docs/connectors/flink-sources/oracle-cdc.md | 12 ++
.../docs/connectors/flink-sources/postgres-cdc.md | 12 ++
.../docs/connectors/flink-sources/sqlserver-cdc.md | 12 ++
.../docs/connectors/pipeline-connectors/mysql.md | 12 ++
.../mysql/factory/MySqlDataSourceFactory.java | 6 +-
.../mysql/source/MySqlDataSourceOptions.java | 8 ++
.../external/IncrementalSourceStreamFetcher.java | 6 +
.../mysql/debezium/reader/BinlogSplitReader.java | 6 +
.../debezium/reader/BinlogSplitReaderTest.java | 159 ++++++++++++++++++++-
17 files changed, 320 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index 0e46ec2a8..faa85c1c2 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -336,6 +336,18 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
这是一项实验特性,默认为 false。
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index 7ae1ae449..9ce67d643 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -413,6 +413,17 @@ Flink SQL> SELECT * FROM orders;
<li>false(默认):所有类型的消息都保持原样下发。</li>
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ 是否在快照读取阶段跳过 backfill 。<br>
+ 如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。<br>
+ 警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证
at-least-once )。
+ 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
index 368b8fe18..2249ba9eb 100644
--- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
@@ -433,6 +433,18 @@ Connector Options
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
index 25bfacede..0bde3b9a1 100644
--- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
@@ -256,6 +256,18 @@ Connector Options
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
index 49b97f2e2..d82c77161 100644
--- a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
@@ -249,6 +249,18 @@ Connector Options
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index ccbfd1fbc..4f38442b7 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -323,6 +323,17 @@ pipeline:
这是一项实验特性,默认为 false。
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ 是否在快照读取阶段跳过 backfill 。<br>
+ 如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。<br>
+ 警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证
at-least-once )。
+ 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
+ </tr>
<tr>
<td>metadata.list</td>
<td>optional</td>
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index 944c038b0..5d6e9bd77 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -361,6 +361,18 @@ Connector Options
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 2a7f3ca72..063a5b126 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -439,6 +439,18 @@ During a snapshot operation, the connector will query each
included table to pro
<li>false (default): All types of messages are sent as is.</li>
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
index 769421899..a91c06588 100644
--- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
@@ -434,6 +434,18 @@ Connector Options
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index b8e476801..8d8b61b42 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -253,6 +253,18 @@ SELECT * FROM shipments;
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
index 683ed15a3..d69a04615 100644
--- a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
@@ -249,6 +249,18 @@ Connector Options
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md
b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 82e2efab2..622a67b93 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -354,6 +354,18 @@ pipeline:
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.backfill.skip</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to skip backfill in snapshot reading phase.<br>
+ If backfill is skipped, changes on captured tables during snapshot
phase will be consumed later in change log reading phase instead of being
merged into the snapshot.<br>
+ WARNING: Skipping backfill might lead to data inconsistency because
some change log events happened within the snapshot phase might be replayed
(only at-least-once semantic is promised).
+ For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
+ </td>
+ </tr>
<tr>
<td>metadata.list</td>
<td>optional</td>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 6267e4642..f522adfdb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -78,6 +78,7 @@ import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
@@ -154,6 +155,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
boolean closeIdleReaders =
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
boolean treatTinyInt1AsBoolean =
config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
+ boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
@@ -218,7 +220,8 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
-
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst);
+ .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
+ .skipSnapshotBackfill(skipSnapshotBackfill);
List<TableId> tableIds =
MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
@@ -355,6 +358,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 2ffad8b2e..6aff556e7 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -322,4 +322,12 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks
first during snapshot reading phase. This might help reduce the risk of the
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of
the largest unbounded chunk. Defaults to false.");
+
+ @Experimental
+ public static final ConfigOption<Boolean>
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP =
+ ConfigOptions.key("scan.incremental.snapshot.backfill.skip")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to skip backfill in snapshot reading
phase. If backfill is skipped, changes on captured tables during snapshot phase
will be consumed later in change log reading phase instead of being merged into
the snapshot.WARNING: Skipping backfill might lead to data inconsistency
because some change log events happened within the snapshot phase might be
replayed (only at-least-once semantic is promised). For example updating an
already updated value in snapshot, [...]
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 1cde14c47..61c2b9ffb 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -64,6 +64,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
private Map<TableId, List<FinishedSnapshotSplitInfo>> finishedSplitsInfo;
// tableId -> the max splitHighWatermark
private Map<TableId, Offset> maxSplitHighWatermarkMap;
+ private final boolean isBackfillSkipped;
private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
@@ -74,6 +75,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
this.executorService =
Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true;
this.pureStreamPhaseTables = new HashSet<>();
+ this.isBackfillSkipped =
taskContext.getSourceConfig().isSkipSnapshotBackfill();
}
@Override
@@ -184,6 +186,10 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
}
// only the table who captured snapshot splits need to filter
if (finishedSplitsInfo.containsKey(tableId)) {
+ // if backfill skipped, don't need to filter
+ if (isBackfillSkipped) {
+ return true;
+ }
for (FinishedSnapshotSplitInfo splitInfo :
finishedSplitsInfo.get(tableId)) {
if (taskContext.isRecordBetween(
sourceRecord,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index 002d04886..87a435ff6 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -89,6 +89,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
private final StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
private final boolean isParsingOnLineSchemaChanges;
+ private final boolean isBackfillSkipped;
private static final long READER_CLOSE_TIMEOUT = 30L;
@@ -110,6 +111,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
this.pureBinlogPhaseTables = new HashSet<>();
this.isParsingOnLineSchemaChanges =
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
+ this.isBackfillSkipped =
statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
}
public void submitSplit(MySqlSplit mySqlSplit) {
@@ -257,6 +259,10 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
// only the table who captured snapshot splits need to filter
if (finishedSplitsInfo.containsKey(tableId)) {
+ // if backfill skipped, don't need to filter
+ if (isBackfillSkipped) {
+ return true;
+ }
RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index 8901408eb..d254d94ad 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -34,6 +34,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
+import
org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
@@ -60,6 +61,7 @@ import
io.debezium.relational.history.TableChanges.TableChange;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -175,6 +177,118 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
+ @Test
+ void testSnapshotScanSkipBackfillWithPostHighWatermark() throws Exception {
+ String tableName = "customers_even_dist";
+ testSnapshotScanSkipBackfill(
+ getSnapshotPhaseHooksWithPostHighWatermark(tableName),
tableName);
+ }
+
+ @Test
+ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
+ String tableName = "customers_even_dist";
+ testSnapshotScanSkipBackfill(
+ getSnapshotPhaseHooksWithPreHighWatermark(tableName),
tableName);
+ }
+
+ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String
tableName)
+ throws Exception {
+ customerDatabase.createAndInitialize();
+ MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName},
true);
+ binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
+ mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
+
+ final DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+ List<MySqlSnapshotSplit> splits = getMySqlSplits(new String[]
{tableName}, sourceConfig);
+ String[] expected =
+ new String[] {
+ "+I[101, user_1, Shanghai, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "+I[103, user_3, Shanghai, 123567891234]",
+ "+I[104, user_4, Shanghai, 123567891234]",
+ "+I[105, user_5, Shanghai, 123567891234]",
+ "+I[106, user_6, Shanghai, 123567891234]",
+ "+I[107, user_7, Shanghai, 123567891234]",
+ "+I[108, user_8, Shanghai, 123567891234]",
+ "-U[103, user_3, Shanghai, 123567891234]",
+ "+U[103, user_3, Hangzhou, 123567891234]",
+ "-D[102, user_2, Shanghai, 123567891234]",
+ "+I[102, user_2, Hangzhou, 123567891234]",
+ "-U[103, user_3, Hangzhou, 123567891234]",
+ "+U[103, user_3, Shanghai, 123567891234]",
+ "-U[103, user_3, Shanghai, 123567891234]",
+ "+U[103, user_3, Hangzhou, 123567891234]",
+ "-D[102, user_2, Hangzhou, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "-U[103, user_3, Hangzhou, 123567891234]",
+ "+U[103, user_3, Shanghai, 123567891234]",
+ "+I[2001, user_22, Shanghai, 123567891234]",
+ "+I[2002, user_23, Shanghai, 123567891234]",
+ "+I[2003, user_24, Shanghai, 123567891234]"
+ };
+
+ // skip snapshot backfill makes highwatermark equal lowwatermark, so
need 2 splits to
+ List<String> actual =
+ readBinlogSplitsFromSnapshotSplits(
+ splits,
+ dataType,
+ sourceConfig,
+ 2,
+ 23,
+ splits.get(splits.size() - 1).getTableId(),
+ snapshotHooks);
+ assertEqualsInAnyOrder(Arrays.asList(expected), actual);
+ }
+
+ @NotNull
+ private SnapshotPhaseHooks
getSnapshotPhaseHooksWithPreHighWatermark(String tableName) {
+ String tableId = customerDatabase.getDatabaseName() + "." + tableName;
+ String[] changingDataSql =
+ new String[] {
+ "UPDATE " + tableId + " SET address = 'Hangzhou' where id
= 103",
+ "DELETE FROM " + tableId + " where id = 102",
+ "INSERT INTO " + tableId + " VALUES(102,
'user_2','Hangzhou','123567891234')",
+ "UPDATE " + tableId + " SET address = 'Shanghai' where id
= 103",
+ };
+
+ SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
+ snapshotHooks.setPostHighWatermarkAction(
+ (mySqlConnection, split) -> {
+ if (split.splitId().equals(tableId + ":0")) {
+ mySqlConnection.execute(changingDataSql);
+ mySqlConnection.commit();
+ }
+ });
+ return snapshotHooks;
+ }
+
+ @NotNull
+ private SnapshotPhaseHooks
getSnapshotPhaseHooksWithPostHighWatermark(String tableName) {
+ String tableId = customerDatabase.getDatabaseName() + "." + tableName;
+ String[] changingDataSql =
+ new String[] {
+ "UPDATE " + tableId + " SET address = 'Hangzhou' where id
= 103",
+ "DELETE FROM " + tableId + " where id = 102",
+ "INSERT INTO " + tableId + " VALUES(102,
'user_2','Hangzhou','123567891234')",
+ "UPDATE " + tableId + " SET address = 'Shanghai' where id
= 103",
+ };
+
+ SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
+ snapshotHooks.setPostHighWatermarkAction(
+ (mySqlConnection, split) -> {
+ if (split.splitId().equals(tableId + ":0")) {
+ mySqlConnection.execute(changingDataSql);
+ mySqlConnection.commit();
+ }
+ });
+ return snapshotHooks;
+ }
+
@Test
void testReadAllBinlogSplitsForOneTable() throws Exception {
customerDatabase.createAndInitialize();
@@ -751,7 +865,11 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase {
// Create config and initializer client and connections
MySqlSourceConfig sourceConfig =
- getConfigFactory(MYSQL_CONTAINER, customerDatabase, new
String[] {"customers"})
+ getConfigFactory(
+ MYSQL_CONTAINER,
+ customerDatabase,
+ new String[] {"customers"},
+ false)
.startupOptions(StartupOptions.latest())
.heartbeatInterval(heartbeatInterval)
.debeziumProperties(dbzProps)
@@ -1048,10 +1166,29 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase
{
int expectedSize,
TableId binlogChangeTableId)
throws Exception {
+ return readBinlogSplitsFromSnapshotSplits(
+ sqlSplits,
+ dataType,
+ sourceConfig,
+ scanSplitsNum,
+ expectedSize,
+ binlogChangeTableId,
+ SnapshotPhaseHooks.empty());
+ }
+
+ private List<String> readBinlogSplitsFromSnapshotSplits(
+ List<MySqlSnapshotSplit> sqlSplits,
+ DataType dataType,
+ MySqlSourceConfig sourceConfig,
+ int scanSplitsNum,
+ int expectedSize,
+ TableId binlogChangeTableId,
+ SnapshotPhaseHooks snapshotHooks)
+ throws Exception {
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient,
mySqlConnection);
final SnapshotSplitReader snapshotSplitReader =
- new SnapshotSplitReader(statefulTaskContext, 0);
+ new SnapshotSplitReader(statefulTaskContext, 0, snapshotHooks);
// step-1: read snapshot splits firstly
List<SourceRecord> snapshotRecords = new ArrayList<>();
@@ -1298,22 +1435,31 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase
{
UniqueDatabase database,
StartupOptions startupOptions,
String[] captureTables) {
- return getConfigFactory(container, database, captureTables)
+ return getConfigFactory(container, database, captureTables, false)
.startupOptions(startupOptions)
.createConfig(0);
}
private MySqlSourceConfig getConfig(String[] captureTables) {
- return getConfig(MYSQL_CONTAINER, customerDatabase, captureTables);
+ return getConfig(captureTables, false);
+ }
+
+ private MySqlSourceConfig getConfig(String[] captureTables, boolean
skipSnapshotBackfill) {
+ return getConfigFactory(
+ MYSQL_CONTAINER, customerDatabase, captureTables,
skipSnapshotBackfill)
+ .createConfig(0);
}
private MySqlSourceConfig getConfig(
MySqlContainer container, UniqueDatabase database, String[]
captureTables) {
- return getConfigFactory(container, database,
captureTables).createConfig(0);
+ return getConfigFactory(container, database, captureTables,
false).createConfig(0);
}
private MySqlSourceConfigFactory getConfigFactory(
- MySqlContainer container, UniqueDatabase database, String[]
captureTables) {
+ MySqlContainer container,
+ UniqueDatabase database,
+ String[] captureTables,
+ boolean skipSnapshotBackfill) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." +
tableName)
@@ -1327,6 +1473,7 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase {
.username(database.getUsername())
.splitSize(4)
.fetchSize(2)
+ .skipSnapshotBackfill(skipSnapshotBackfill)
.password(database.getPassword());
}