taoran92 commented on code in PR #4422:
URL: https://github.com/apache/flink-cdc/pull/4422#discussion_r3426452406


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java:
##########
@@ -284,6 +281,22 @@ public Predicate<TableId> getTableFilter() {
         return tableId -> 
tableFilters.dataCollectionFilter().isIncluded(tableId);
     }
 
+    static Tables.TableFilter createCachedTableFilter(
+            Tables.TableFilter tableFilter, @Nullable Selectors 
excludeTableFilter) {
+        Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>();
+        return tableId ->
+                tableFilterCache.computeIfAbsent(
+                        tableId, id -> isTableIncluded(tableFilter, 
excludeTableFilter, id));
+    }

Review Comment:
   Good point. I replaced the unbounded ConcurrentHashMap with a bounded 
LoadingCache (max 1024 entries, expire after 1h access), so hot TableIds are 
still cached while long-running jobs won't retain unlimited dynamic table keys.
   
   BTW. I kept the cache bounds as internal constants rather than new connector 
options because this is only an internal optimization: eviction affects 
performance but not correctness. Adding public options would expand the 
connector API surface without a clear user-facing semantic need. We can expose 
them later if real workloads show the defaults need tuning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to