ruanhang1993 commented on code in PR #3776:
URL: https://github.com/apache/flink-cdc/pull/3776#discussion_r2958729850
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java:
##########
@@ -242,12 +243,24 @@ private void createDataEventsForTable(
long exportStart = clock.currentTimeInMillis();
LOG.info("Exporting data from split '{}' of table {}",
snapshotSplit.splitId(), table.id());
+ String filter =
+ SnapshotFilterUtils.getSnapshotFilter(
+ sourceConfig.getSnapshotFilters(), table.id());
Review Comment:
Could we store the filters as a class member variable instead of
constructing it on every invocation?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java:
##########
@@ -206,6 +210,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+ options.add(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER);
Review Comment:
I am considering whether we should implement `SupportsFilterPushDown` for
the sql connector instead of a new config.
##########
docs/content/docs/connectors/flink-sources/mysql-cdc.md:
##########
@@ -460,6 +460,13 @@ Only valid for cdc 1.x version. During a snapshot
operation, the connector will
For example updating an already updated value in snapshot, or deleting
an already deleted entry in snapshot. These replayed change log events should
be handled specially.
</td>
</tr>
+ <tr>
+ <td>scan.snapshot.filter</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>When reading a table snapshot, the rows of captured tables will be
filtered using the specified filter expression (AKA a SQL WHERE clause). By
default, no filter is applied, meaning the entire table will be synchronized.
e.g. `id > 100`.</td>
+ </tr>
Review Comment:
It is better to add a new use example for this feature.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java:
##########
@@ -330,6 +332,142 @@ void testAddChunkKeyColumns() {
});
}
+ @Test
+ void testAddSnapshotFilters() {
Review Comment:
We need more tests for this feature(IT, E2E test).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]