taoran92 commented on code in PR #4422:
URL: https://github.com/apache/flink-cdc/pull/4422#discussion_r3426452406
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java:
##########
@@ -284,6 +281,22 @@ public Predicate<TableId> getTableFilter() {
return tableId ->
tableFilters.dataCollectionFilter().isIncluded(tableId);
}
+ static Tables.TableFilter createCachedTableFilter(
+ Tables.TableFilter tableFilter, @Nullable Selectors
excludeTableFilter) {
+ Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>();
+ return tableId ->
+ tableFilterCache.computeIfAbsent(
+ tableId, id -> isTableIncluded(tableFilter,
excludeTableFilter, id));
+ }
Review Comment:
Good point. I replaced the unbounded ConcurrentHashMap with a bounded
LoadingCache (max 1024 entries, expire after 1h access), so hot TableIds are
still cached while long-running jobs won't retain unlimited dynamic table keys.
BTW. I kept the cache bounds as internal constants rather than new connector
options because this is only an internal optimization: eviction affects
performance but not correctness. Adding public options would expand the
connector API surface without a clear user-facing semantic need. We can expose
them later if real workloads show the defaults need tuning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]