Hisoka-X commented on code in PR #8323:
URL: https://github.com/apache/seatunnel/pull/8323#discussion_r1897058560


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java:
##########
@@ -99,9 +110,47 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
             TableSource<T, SplitT, StateT> restoreSource(
                     TableSourceFactoryContext context, List<CatalogTable> 
restoreTables) {
         return () -> {
+            ReadonlyConfig config = context.getOptions();
+            Optional<String> optional = 
config.getOptional(CatalogOptions.TABLE_PATTERN);
+            if (optional.isPresent()) {
+                List<String> tableNames = new ArrayList<>();
+                Pattern tablePattern = 
Pattern.compile(config.get(CatalogOptions.TABLE_PATTERN));
+                try (Connection connection =
+                                DriverManager.getConnection(
+                                        
config.get(JdbcCatalogOptions.BASE_URL),
+                                        
config.get(JdbcCatalogOptions.USERNAME),
+                                        
config.get(JdbcCatalogOptions.PASSWORD));
+                        Statement statement = connection.createStatement()) {
+                    ResultSet resultSet =
+                            statement.executeQuery(
+                                    "select concat(table_schema,'.', 
table_name) as table_name from information_schema.tables where table_schema not 
in ('sys', 'information_schema', 'mysql', 'performance_schema')");
+                    while (resultSet.next()) {
+                        String tableName = resultSet.getString("table_name");
+                        if (tablePattern.matcher(tableName).matches()) {
+                            tableNames.add(tableName);
+                        }
+                    }
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
+                if (tableNames.isEmpty()) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "[%s] No tables matched the pattern: ",
+                                    tablePattern.pattern()));
+                }
+                Map<String, Object> sourceMap = config.getSourceMap();
+                sourceMap.remove(CatalogOptions.TABLE_PATTERN.key());
+                if 
(config.getOptional(JdbcSourceOptions.DATABASE_NAMES).isPresent()) {
+                    log.warn(
+                            "Using `table-pattern` is `database-names` will be 
invalid, please remove it.");

Review Comment:
   ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to