taoran92 commented on code in PR #4422:
URL: https://github.com/apache/flink-cdc/pull/4422#discussion_r3426452406
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java:
##########
@@ -284,6 +281,22 @@ public Predicate<TableId> getTableFilter() {
return tableId ->
tableFilters.dataCollectionFilter().isIncluded(tableId);
}
+ static Tables.TableFilter createCachedTableFilter(
+ Tables.TableFilter tableFilter, @Nullable Selectors
excludeTableFilter) {
+ Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>();
+ return tableId ->
+ tableFilterCache.computeIfAbsent(
+ tableId, id -> isTableIncluded(tableFilter,
excludeTableFilter, id));
+ }
Review Comment:
Good point. I replaced the unbounded ConcurrentHashMap with a bounded
LoadingCache (max 1024 entries, expire after 1h access), so hot TableIds are
still cached while long-running jobs won't retain unlimited dynamic table keys.
BTW, I kept the cache bounds as internal constants instead of adding new
connector options. This cache is only an internal optimization: eviction may
cause the table filter to be evaluated again, but it does not affect
correctness. Exposing these values would expand the public connector API
without a clear user-facing semantic need.
The default maximum size follows the existing selector cache convention in
the codebase, and we can still expose/tune it later if real workloads show the
defaults are insufficient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]