taoran92 commented on code in PR #4422:
URL: https://github.com/apache/flink-cdc/pull/4422#discussion_r3426452406


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java:
##########
@@ -284,6 +281,22 @@ public Predicate<TableId> getTableFilter() {
         return tableId -> 
tableFilters.dataCollectionFilter().isIncluded(tableId);
     }
 
+    static Tables.TableFilter createCachedTableFilter(
+            Tables.TableFilter tableFilter, @Nullable Selectors 
excludeTableFilter) {
+        Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>();
+        return tableId ->
+                tableFilterCache.computeIfAbsent(
+                        tableId, id -> isTableIncluded(tableFilter, 
excludeTableFilter, id));
+    }

Review Comment:
   Good point. I replaced the unbounded ConcurrentHashMap with a bounded 
LoadingCache (max 1024 entries, expire after 1h access), so hot TableIds are 
still cached while long-running jobs won't retain unlimited dynamic table keys.
   
   BTW, I kept the cache bounds as internal constants instead of adding new 
connector options. This cache is only an internal optimization: eviction may 
cause the table filter to be evaluated again, but it does not affect 
correctness. Exposing these values would expand the public connector API 
without a clear user-facing semantic need. 
   
   The default maximum size follows the existing selector cache convention in 
the codebase, and we can still expose/tune it later if real workloads show the 
defaults are insufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to