Copilot commented on code in PR #3776:
URL: https://github.com/apache/flink-cdc/pull/3776#discussion_r2730199935
##########
docs/content/docs/connectors/pipeline-connectors/mysql.md:
##########
@@ -364,6 +364,17 @@ pipeline:
List of readable metadata from SourceRecord to be passed to downstream
and could be used in transform module, split by `,`. Available readable
metadata are: op_ts.
</td>
</tr>
+ <tr>
+ <td>scan.snapshot.filters</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>When reading a table snapshot, the rows of captured tables will be
filtered using the specified filter expression (AKA a SQL WHERE clause). <br>
+ By default, no filter is applied, meaning the entire table will be
synchronized. <br>
+ A colon (:) separates table name and filter expression, while a
semicolon (;) separate multiple filters,
+ e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id
< 0;`.
Review Comment:
Documentation mismatch: this table lists `scan.snapshot.filters` as a
`String` using a `table:filter;...` format, but the pipeline MySQL connector
option is implemented as `List<Map<String,String>>` (see
`MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS`) and the example YAML in this PR
uses the list form. Please update the docs to reflect the actual type/format
(and keep it consistent with the PR description/examples).
```suggestion
<td>List<Map<String, String>></td>
<td>When reading a table snapshot, the rows of captured tables will be
filtered using the specified filter expressions (SQL WHERE clauses). <br>
This option is configured as a list of filter specifications,
where each specification is represented as a map of string key-value pairs (as
shown in the example configuration below). <br>
By default, no filter is applied, meaning the entire table will be
synchronized.
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import io.debezium.relational.TableId;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Utilities to filter snapshot of table. */
+public class SnapshotFilterUtils {
+
+ private SnapshotFilterUtils() {}
+
+ private static final Map<Map<String, String>, Map<Selectors, String>>
cache = new HashMap<>();
+
+ /**
+ * Don't worry about atomicity. We don't need to use the synchronized
keyword to ensure thread
+ * safety here. Since filters come from the source configuration, they
shouldn’t be changed
+ * during runtime. So the result will always be idempotent.
+ */
+ private static Map<Selectors, String> toSelector(Map<String, String>
filters) {
+ Map<Selectors, String> cached = cache.get(filters);
+ if (cached != null) {
+ return cached;
+ }
+
+ Map<Selectors, String> snapshotFilters = new HashMap<>();
+ filters.forEach(
+ (table, filter) -> {
+ Selectors selector =
+ new
Selectors.SelectorsBuilder().includeTables(table).build();
+ snapshotFilters.put(selector, filter);
+ });
+ cache.put(filters, snapshotFilters);
+
+ return snapshotFilters;
Review Comment:
`SnapshotFilterUtils` uses a static mutable `HashMap` cache without
synchronization. Even if the computed result is idempotent, concurrent
`get/put` on a `HashMap` is not thread-safe and can lead to data races or map
corruption. Consider removing this cache or switching to a `ConcurrentHashMap`
with `computeIfAbsent` (and avoid using a mutable `Map` as the cache key).
```suggestion
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** Utilities to filter snapshot of table. */
public class SnapshotFilterUtils {
private SnapshotFilterUtils() {}
private static final Map<Map<String, String>, Map<Selectors, String>>
cache =
new ConcurrentHashMap<>();
/**
* Converts the given filters to a map keyed by {@link Selectors},
caching the result.
*
* <p>The cache is backed by a {@link ConcurrentHashMap} to be safe
under concurrent access.
* To avoid using a mutable {@link Map} as the cache key, an immutable
copy of the input
* filters is created and used as the key.
*/
private static Map<Selectors, String> toSelector(Map<String, String>
filters) {
// Create an immutable copy of the filters to avoid using a mutable
map as the cache key.
Map<String, String> immutableFilters =
Collections.unmodifiableMap(new HashMap<>(filters));
return cache.computeIfAbsent(
immutableFilters,
key -> {
Map<Selectors, String> snapshotFilters = new HashMap<>();
key.forEach(
(table, filter) -> {
Selectors selector =
new Selectors.SelectorsBuilder()
.includeTables(table)
.build();
snapshotFilters.put(selector, filter);
});
return snapshotFilters;
});
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java:
##########
@@ -241,6 +244,9 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext scanContext) {
.parseOnLineSchemaChanges(parseOnlineSchemaChanges)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
+ .snapshotFilters(
+ escapeDot(database) + "." +
escapeDot(tableName),
+ snapshotFilter)
.build();
Review Comment:
`snapshotFilter` is only forwarded into `.snapshotFilters(...)` when
`enableParallelRead` is true. If a user sets `scan.snapshot.filter` while
parallel snapshot is disabled, the option will currently be silently ignored.
Consider either (1) validating and failing fast when the option is set but
parallel read is disabled, or (2) implementing equivalent filtering for the
non-parallel Debezium source path, and documenting the supported modes.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java:
##########
@@ -172,6 +214,10 @@ private static String buildSplitQuery(
condition = sql.toString();
}
+ if (filter != null) {
+ condition = condition == null ? filter : filter + " AND " +
condition;
Review Comment:
When combining the user-provided `filter` with additional split conditions,
the code concatenates it as `filter + " AND " + condition` without parentheses.
If the filter contains `OR` (or other operators), SQL precedence can change the
intended semantics and even break split boundary guarantees (leading to
duplicates/missing rows across splits). Wrap the filter in parentheses
everywhere it is combined (here and similarly in `queryMin` /
`queryNextChunkMax`).
```suggestion
condition = condition == null ? filter : "(" + filter + ") AND "
+ condition;
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Unit test for {@link
org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils}. */
+public class SnapshotFilterUtilsTest {
+
+ @Test
+ public void test() {
Review Comment:
Test naming is very generic (`test`). In this module, other tests use
descriptive method names (e.g., `ObjectUtilsTest#testMinus`,
`RecordUtilsTest#testSplitKeyRangeContains`). Renaming this to something like
`testGetSnapshotFilter` would make failures easier to interpret.
```suggestion
public void testGetSnapshotFilter() {
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java:
##########
@@ -56,6 +61,31 @@ public static Object[] queryMinMax(JdbcConnection jdbc,
TableId tableId, String
});
}
+ public static Long queryRowCnt(
+ JdbcConnection jdbc, TableId tableId, String columnName, @Nullable
String filter)
+ throws SQLException {
+
+ if (filter == null) {
+ return queryApproximateRowCnt(jdbc, tableId);
+ }
+
+ final String cntQuery =
+ String.format(
+ "SELECT COUNT(%s) FROM %s WHERE %s",
+ quote(columnName), quote(tableId), filter);
Review Comment:
`queryRowCnt` uses `COUNT(columnName)` when a filter is provided. This can
undercount rows if the split column contains NULLs, which can skew chunk sizing
/ distribution-factor calculations. Use `COUNT(*)` (or `COUNT(1)`) for row
counts, and consider wrapping the filter in parentheses in the WHERE clause for
safety.
```suggestion
"SELECT COUNT(1) FROM %s WHERE (%s)",
quote(tableId), filter);
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java:
##########
@@ -285,6 +288,19 @@ public DataSource createDataSource(Context context) {
LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}
+
+ List<Map<String, String>> snapshotFilters =
config.get(SCAN_SNAPSHOT_FILTERS);
+ if (snapshotFilters != null && !snapshotFilters.isEmpty()) {
+ Map<String, String> snapshotFiltersMap =
+ snapshotFilters.stream()
+ .collect(
+ Collectors.toMap(
+ it ->
it.get(SNAPSHOT_FILTER_TABLE_KEY),
+ it ->
it.get(SNAPSHOT_FILTER_FILTER_KEY)));
+ LOG.info("Add snapshotFilters {}.", snapshotFiltersMap);
+ configFactory.snapshotFilters(snapshotFiltersMap);
+ }
Review Comment:
`scan.snapshot.filters` is parsed via `Collectors.toMap(...)` without
validating required keys (`table`/`filter`) and without handling duplicates.
This can throw a generic `NullPointerException`/`IllegalStateException` on
malformed or duplicate entries and also drops the original list order (which
matters if patterns overlap). Consider validating each entry and building a
`LinkedHashMap` with explicit duplicate handling (e.g., reject with a clear
message).
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import io.debezium.relational.TableId;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Utilities to filter snapshot of table. */
+public class SnapshotFilterUtils {
+
+ private SnapshotFilterUtils() {}
+
+ private static final Map<Map<String, String>, Map<Selectors, String>>
cache = new HashMap<>();
+
+ /**
+ * Don't worry about atomicity. We don't need to use the synchronized
keyword to ensure thread
+ * safety here. Since filters come from the source configuration, they
shouldn’t be changed
+ * during runtime. So the result will always be idempotent.
+ */
+ private static Map<Selectors, String> toSelector(Map<String, String>
filters) {
+ Map<Selectors, String> cached = cache.get(filters);
+ if (cached != null) {
+ return cached;
+ }
+
+ Map<Selectors, String> snapshotFilters = new HashMap<>();
+ filters.forEach(
+ (table, filter) -> {
+ Selectors selector =
+ new
Selectors.SelectorsBuilder().includeTables(table).build();
+ snapshotFilters.put(selector, filter);
+ });
+ cache.put(filters, snapshotFilters);
+
+ return snapshotFilters;
+ }
+
+ public static String getSnapshotFilter(Map<String, String> filters,
TableId tableId) {
+ Map<Selectors, String> snapshotFilters = toSelector(filters);
+
+ String filter = null;
+ for (Selectors selector : snapshotFilters.keySet()) {
+ if (selector.isMatch(
+ org.apache.flink.cdc.common.event.TableId.tableId(
+ tableId.catalog(), tableId.table()))) {
+ filter = snapshotFilters.get(selector);
+ break;
+ }
Review Comment:
`getSnapshotFilter` picks the *first* matching selector from a `HashMap`
keySet iteration, which is non-deterministic when multiple patterns match the
same table. This can cause different filters to be applied across runs/JVMs.
Please make matching deterministic (e.g., preserve user-defined order via
`LinkedHashMap`/list, or define precedence like exact match > regex, or fail
fast on multiple matches).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]