This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 72345f0ec [FLINK-37507][source-connector/mysql] Fix MySQL CDC 
accidentally captures common-prefix database when 
`scan.binlog.newly-added-table` is enabled
72345f0ec is described below

commit 72345f0ec73c9cc3013d2804c9020244167bcb3b
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Mon Apr 21 17:24:40 2025 +0800

    [FLINK-37507][source-connector/mysql] Fix MySQL CDC accidentally captures 
common-prefix database when `scan.binlog.newly-added-table` is enabled
    
    This closes #3957
---
 .../mysql/factory/MySqlDataSourceFactory.java      |  54 ++-
 .../source/MySqlTablePatternMatchingTest.java      | 451 +++++++++++++++++++++
 .../mysql/source/MySqlSourceTestBase.java          |   7 +
 3 files changed, 498 insertions(+), 14 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index cbc996aff..d2fb8d61c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -497,31 +497,57 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                         distributionFactorLower));
     }
 
+    private static final String DOT_PLACEHOLDER = "_$dot_placeholder$_";
+
     /**
      * Currently, The supported regular syntax is not exactly the same in 
{@link Selectors} and
      * {@link Tables.TableFilter}.
      *
      * <p>The main distinction are :
      *
-     * <p>1) {@link Selectors} use `,` to split table names and {@link 
Tables.TableFilter} use use
+     * <p>1) {@link Selectors} use {@code ,} to split table names and {@link 
Tables.TableFilter} use
      * `|` to split table names.
      *
-     * <p>2) If there is a need to use a dot (.) in a regular expression to 
match any character, it
-     * is necessary to escape the dot with a backslash, refer to {@link
+     * <p>2) If there is a need to use a dot ({@code .}) in a regular 
expression to match any
+     * character, it is necessary to escape the dot with a backslash, refer to 
{@link
      * MySqlDataSourceOptions#TABLES}.
+     *
+     * <p>3) The unescaped {@code .} is used as the separator of database and 
table name. When
+     * converting to Debezium style, it is expected to be escaped to match the 
dot ({@code .})
+     * literally instead of the meta-character.
      */
     private String validateTableAndReturnDebeziumStyle(String tables) {
-        // MySQL table names are not allowed to have `,` character.
-        if (tables.contains(",")) {
-            throw new IllegalArgumentException(
-                    "the `,` in "
-                            + tables
-                            + " is not supported when "
-                            + SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED
-                            + " was enabled.");
-        }
-
-        return tables.replace("\\.", ".");
+        LOG.info("Rewriting CDC style table capture list: {}", tables);
+
+        // In CDC-style table matching, table names could be separated by `,` 
character.
+        // Convert it to `|` as it's standard RegEx syntax.
+        tables = tables.replace(",", "|");
+        LOG.info("Expression after replacing comma with vert separator: {}", 
tables);
+
+        // Essentially, we're just trying to swap escaped `\\.` and unescaped 
`.`.
+        // In our table matching syntax, `\\.` means RegEx token matcher and 
`.` means database &
+        // table name separator.
+        // On the contrary, while we're matching TableId string, `\\.` means 
matching the "dot"
+        // literal and `.` is the meta-character.
+
+        // Step 1: escape the dot with a backslash, but keep it as a 
placeholder (like `$`).
+        // For example, `db\.*.tbl\.*` => `db$*.tbl$*`
+        String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
+        LOG.info("Expression after unescaping dots as RegEx meta-character: 
{}", unescapedTables);
+
+        // Step 2: replace all remaining dots (`.`) to quoted version (`\.`), 
as a separator between
+        // database and table names.
+        // For example, `db$*.tbl$*` => `db$*\.tbl$*`
+        String unescapedTablesWithDbTblSeparator = 
unescapedTables.replace(".", "\\.");
+        LOG.info("Re-escaping dots as TableId delimiter: {}", 
unescapedTablesWithDbTblSeparator);
+
+        // Step 3: restore placeholder to normal RegEx matcher (`.`)
+        // For example, `db$*\.tbl$*` => `db.*\.tbl.*`
+        String debeziumStyleTableCaptureList =
+                unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, 
".");
+        LOG.info("Final Debezium-style table capture list: {}", 
debeziumStyleTableCaptureList);
+
+        return debeziumStyleTableCaptureList;
     }
 
     /** Replaces the default timezone placeholder with session timezone, if 
applicable. */
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTablePatternMatchingTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTablePatternMatchingTest.java
new file mode 100644
index 000000000..dfffe05de
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTablePatternMatchingTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.loopCheck;
+
+/** Test cases for matching MySQL source tables. */
+class MySqlTablePatternMatchingTest extends MySqlSourceTestBase {
+
+    private final PrintStream standardOut = System.out;
+
+    private static final List<Tuple2<String, String>> TEST_TABLES =
+            Arrays.asList(
+                    Tuple2.of("db", "tbl1"),
+                    Tuple2.of("db", "tbl2"),
+                    Tuple2.of("db", "tbl3"),
+                    Tuple2.of("db", "tbl4"),
+                    Tuple2.of("db2", "tbl2"),
+                    Tuple2.of("db3", "tbl3"),
+                    Tuple2.of("db4", "tbl4"));
+
+    @BeforeAll
+    static void initializeDatabase() {
+        initializeMySqlTables(TEST_TABLES);
+    }
+
+    @AfterAll
+    static void tearDownDatabase() {
+        tearDownMySqlTables(TEST_TABLES);
+    }
+
+    @Test
+    void testWildcardMatching() {
+        Assertions.assertThat(testGenericTableMatching("\\.*.\\.*", null, 
false))
+                .containsExactlyInAnyOrder(
+                        "db.tbl1",
+                        "db.tbl2",
+                        "db.tbl3",
+                        "db.tbl4",
+                        "db2.tbl2",
+                        "db3.tbl3",
+                        "db4.tbl4");
+
+        Assertions.assertThat(testGenericTableMatching("\\.*.\\.*", null, 
true))
+                .containsExactlyInAnyOrder(".*\\..*");
+    }
+
+    @Test
+    void testWildcardMatchingDatabases() {
+        Assertions.assertThat(testGenericTableMatching("\\.*.tbl[3-4]", null, 
false))
+                .containsExactlyInAnyOrder("db.tbl3", "db.tbl4", "db3.tbl3", 
"db4.tbl4");
+
+        Assertions.assertThat(testGenericTableMatching("\\.*.tbl[3-4]", null, 
true))
+                .containsExactlyInAnyOrder(".*\\.tbl[3-4]");
+    }
+
+    @Test
+    void testWildcardMatchingTables() {
+        Assertions.assertThat(testGenericTableMatching("db.\\.*", null, false))
+                .containsExactlyInAnyOrder("db.tbl1", "db.tbl2", "db.tbl3", 
"db.tbl4");
+
+        Assertions.assertThat(testGenericTableMatching("db.\\.*", null, true))
+                .containsExactlyInAnyOrder("db\\..*");
+    }
+
+    @Test
+    void testWildcardMatchingPartialDatabases() {
+        // `db.` matches `db2`, `db3`, `db4` but not `db`
+        Assertions.assertThat(testGenericTableMatching("db\\..\\.*", null, 
false))
+                .containsExactlyInAnyOrder("db2.tbl2", "db3.tbl3", "db4.tbl4");
+
+        Assertions.assertThat(testGenericTableMatching("db\\..\\.*", null, 
true))
+                .containsExactlyInAnyOrder("db.\\..*");
+    }
+
+    @Test
+    void testWildcardMatchingWithExclusion() {
+        Assertions.assertThat(testGenericTableMatching("\\.*.\\.*", "db.tbl3", 
false))
+                .containsExactlyInAnyOrder(
+                        "db.tbl1", "db.tbl2", "db.tbl4", "db2.tbl2", 
"db3.tbl3", "db4.tbl4");
+    }
+
+    @Test
+    void testWildcardMatchingDatabasesWithExclusion() {
+        Assertions.assertThat(testGenericTableMatching("\\.*.tbl[3-4]", 
"db.tbl[3-4]", false))
+                .containsExactlyInAnyOrder("db3.tbl3", "db4.tbl4");
+    }
+
+    @Test
+    void testWildcardMatchingTablesWithExclusion() {
+        Assertions.assertThat(testGenericTableMatching("db.\\.*", "db.tbl4", 
false))
+                .containsExactlyInAnyOrder("db.tbl1", "db.tbl2", "db.tbl3");
+    }
+
+    @Test
+    void testWildcardMatchingPartialDatabasesWithExclusion() {
+        // `db.` matches `db2`, `db3`, `db4` but not `db`
+        Assertions.assertThat(testGenericTableMatching("db\\..\\.*", 
"db3.\\.*", false))
+                .containsExactlyInAnyOrder("db2.tbl2", "db4.tbl4");
+    }
+
+    @Test
+    void testMatchingTablesWithMultipleRules() {
+        
Assertions.assertThat(testGenericTableMatching("db.tbl1,db2.tbl\\.*,db3.tbl3", 
null, false))
+                .containsExactlyInAnyOrder("db.tbl1", "db2.tbl2", "db3.tbl3");
+
+        
Assertions.assertThat(testGenericTableMatching("db.tbl1,db2.tbl\\.*,db3.tbl3", 
null, true))
+                .containsExactlyInAnyOrder("db\\.tbl1|db2\\.tbl.*|db3\\.tbl3");
+    }
+
+    @Test
+    void testWildcardMatchingRealTables() throws Exception {
+        String[] expected =
+                new String[] {
+                    "CreateTableEvent{tableId=db.tbl1, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl1, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db.tbl2, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl2, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db.tbl3, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl3, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db.tbl4, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl4, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db2.tbl2, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db2.tbl2, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db3.tbl3, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db3.tbl3, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db4.tbl4, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db4.tbl4, before=[], after=[17], 
op=INSERT, meta=()}"
+                };
+
+        Assertions.assertThat(getRealWorldMatchedTables("\\.*.\\.*", null, 
false, expected.length))
+                .containsExactlyInAnyOrder(expected);
+
+        Assertions.assertThat(getRealWorldMatchedTables("\\.*.\\.*", null, 
true, expected.length))
+                .containsExactlyInAnyOrder(expected);
+    }
+
+    @Test
+    void testWildcardMatchingDatabasesRealTables() throws Exception {
+        String[] expected =
+                new String[] {
+                    "CreateTableEvent{tableId=db.tbl3, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl3, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db.tbl4, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl4, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db3.tbl3, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db3.tbl3, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db4.tbl4, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db4.tbl4, before=[], after=[17], 
op=INSERT, meta=()}"
+                };
+
+        Assertions.assertThat(
+                        getRealWorldMatchedTables("\\.*.tbl[3-4]", null, 
false, expected.length))
+                .containsExactlyInAnyOrder(expected);
+
+        Assertions.assertThat(
+                        getRealWorldMatchedTables("\\.*.tbl[3-4]", null, true, 
expected.length))
+                .containsExactlyInAnyOrder(expected);
+    }
+
+    @Test
+    void testWildcardMatchingTablesRealTables() throws Exception {
+        String[] expected =
+                new String[] {
+                    "CreateTableEvent{tableId=db.tbl1, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl1, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db.tbl2, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl2, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db.tbl3, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl3, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db.tbl4, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl4, before=[], after=[17], 
op=INSERT, meta=()}"
+                };
+
+        Assertions.assertThat(getRealWorldMatchedTables("db.\\.*", null, 
false, expected.length))
+                .containsExactlyInAnyOrder(expected);
+
+        Assertions.assertThat(getRealWorldMatchedTables("db.\\.*", null, true, 
expected.length))
+                .containsExactlyInAnyOrder(expected);
+    }
+
+    @Test
+    void testWildcardMatchingPartialDatabasesRealTables() throws Exception {
+        String[] expected =
+                new String[] {
+                    "CreateTableEvent{tableId=db2.tbl2, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db2.tbl2, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db3.tbl3, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db3.tbl3, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db4.tbl4, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db4.tbl4, before=[], after=[17], 
op=INSERT, meta=()}"
+                };
+
+        // `db.` matches `db2`, `db3`, `db4` but not `db`
+        Assertions.assertThat(getRealWorldMatchedTables("db\\..\\.*", null, 
false, expected.length))
+                .containsExactlyInAnyOrder(expected);
+
+        Assertions.assertThat(getRealWorldMatchedTables("db\\..\\.*", null, 
true, expected.length))
+                .containsExactlyInAnyOrder(expected);
+    }
+
+    @Test
+    void testMultipleRulesWithRealTables() throws Exception {
+        String[] expected =
+                new String[] {
+                    "CreateTableEvent{tableId=db.tbl1, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db.tbl1, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db2.tbl2, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db2.tbl2, before=[], after=[17], 
op=INSERT, meta=()}",
+                    "CreateTableEvent{tableId=db3.tbl3, schema=columns={`id` 
INT NOT NULL}, primaryKeys=id, options=()}",
+                    "DataChangeEvent{tableId=db3.tbl3, before=[], after=[17], 
op=INSERT, meta=()}"
+                };
+
+        Assertions.assertThat(
+                        getRealWorldMatchedTables(
+                                "db.tbl1,db2.tbl\\.*,db3.tbl3", null, false, 
expected.length))
+                .containsExactlyInAnyOrder(expected);
+
+        Assertions.assertThat(
+                        getRealWorldMatchedTables(
+                                "db.tbl1,db2.tbl\\.*,db3.tbl3", null, true, 
expected.length))
+                .containsExactlyInAnyOrder(expected);
+    }
+
+    private static void initializeMySqlTables(List<Tuple2<String, String>> 
tableNames) {
+        tableNames.forEach(
+                tableName -> {
+                    try (Connection connection =
+                                    DriverManager.getConnection(
+                                            MYSQL_CONTAINER.getJdbcUrl(),
+                                            TEST_USER,
+                                            TEST_PASSWORD);
+                            Statement statement = 
connection.createStatement()) {
+                        statement.execute(
+                                String.format("CREATE DATABASE IF NOT EXISTS 
`%s`;", tableName.f0));
+                        statement.execute(
+                                String.format(
+                                        "CREATE TABLE IF NOT EXISTS `%s`.`%s` 
(id INT PRIMARY KEY NOT NULL);",
+                                        tableName.f0, tableName.f1));
+
+                        statement.execute(
+                                String.format(
+                                        "INSERT INTO `%s`.`%s` VALUES (17);",
+                                        tableName.f0, tableName.f1));
+                    } catch (SQLException e) {
+                        throw new RuntimeException("Failed to initialize 
databases and tables", e);
+                    }
+                });
+    }
+
+    private static void tearDownMySqlTables(List<Tuple2<String, String>> 
tableNames) {
+        tableNames.forEach(
+                tableName -> {
+                    try (Connection connection =
+                                    DriverManager.getConnection(
+                                            MYSQL_CONTAINER.getJdbcUrl(),
+                                            TEST_USER,
+                                            TEST_PASSWORD);
+                            Statement statement = 
connection.createStatement()) {
+                        statement.execute(
+                                String.format("DROP DATABASE IF EXISTS `%s`;", 
tableName.f0));
+                    } catch (SQLException e) {
+                        throw new RuntimeException("Failed to clean-up 
databases", e);
+                    }
+                });
+    }
+
+    private List<String> testGenericTableMatching(
+            String tablesConfig,
+            @Nullable String tablesExclude,
+            boolean scanBinlogNewlyAddedTable) {
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), tablesConfig);
+        options.put(
+                SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(),
+                String.valueOf(scanBinlogNewlyAddedTable));
+        if (tablesExclude != null) {
+            options.put(TABLES_EXCLUDE.key(), tablesExclude);
+        }
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
+        return dataSource.getSourceConfig().getTableList();
+    }
+
+    class MockContext implements Factory.Context {
+
+        Configuration factoryConfiguration;
+
+        public MockContext(Configuration factoryConfiguration) {
+            this.factoryConfiguration = factoryConfiguration;
+        }
+
+        @Override
+        public Configuration getFactoryConfiguration() {
+            return factoryConfiguration;
+        }
+
+        @Override
+        public Configuration getPipelineConfiguration() {
+            return null;
+        }
+
+        @Override
+        public ClassLoader getClassLoader() {
+            return this.getClassLoader();
+        }
+    }
+
+    private List<String> getRealWorldMatchedTables(
+            String tables,
+            @Nullable String tablesExclude,
+            boolean scanBinlogNewlyAddedTable,
+            int expectedCount)
+            throws Exception {
+        try (ByteArrayOutputStream outCaptor = new ByteArrayOutputStream()) {
+            System.setOut(new PrintStream(outCaptor));
+            FlinkPipelineComposer composer = 
FlinkPipelineComposer.ofMiniCluster();
+
+            // Setup MySQL source
+            Configuration sourceConfig = new Configuration();
+            sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, 
MYSQL_CONTAINER.getHost());
+            sourceConfig.set(MySqlDataSourceOptions.PORT, 
MYSQL_CONTAINER.getDatabasePort());
+            sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER);
+            sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD);
+            sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
+            sourceConfig.set(MySqlDataSourceOptions.TABLES, tables);
+            if (tablesExclude != null) {
+                sourceConfig.set(MySqlDataSourceOptions.TABLES_EXCLUDE, 
tablesExclude);
+            }
+            sourceConfig.set(
+                    
MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED,
+                    scanBinlogNewlyAddedTable);
+            sourceConfig.set(MySqlDataSourceOptions.SERVER_ID, getServerId(1));
+
+            SourceDef sourceDef =
+                    new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL 
Source", sourceConfig);
+
+            // Setup value sink
+            Configuration sinkConfig = new Configuration();
+            sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+            SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+            // Setup pipeline
+            Configuration pipelineConfig = new Configuration();
+            pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+            pipelineConfig.set(
+                    PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+            PipelineDef pipelineDef =
+                    new PipelineDef(
+                            sourceDef,
+                            sinkDef,
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            pipelineConfig);
+
+            // Execute the pipeline
+            PipelineExecution execution = composer.compose(pipelineDef);
+            Thread executeThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    execution.execute();
+                                } catch (InterruptedException ignored) {
+
+                                } catch (Exception e) {
+                                    throw new RuntimeException(e);
+                                }
+                            });
+
+            executeThread.start();
+
+            try {
+                loopCheck(
+                        () -> outCaptor.toString().trim().split("\n").length 
>= expectedCount,
+                        "collect enough rows",
+                        Duration.ofSeconds(120),
+                        Duration.ofSeconds(1));
+            } finally {
+                executeThread.interrupt();
+            }
+            return Arrays.asList(outCaptor.toString().trim().split("\n"));
+        } finally {
+            System.setOut(standardOut);
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
index 3b51104e9..08ccfdcce 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
@@ -43,6 +43,7 @@ import org.testcontainers.lifecycle.Startables;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.stream.Stream;
 
 /** Basic class for testing {@link MySqlSource}. */
@@ -180,4 +181,10 @@ public abstract class MySqlSourceTestBase extends 
TestLogger {
             }
         }
     }
+
+    protected String getServerId(int parallelism) {
+        final Random random = new Random();
+        int serverId = random.nextInt(100) + 5400;
+        return serverId + "-" + (serverId + parallelism);
+    }
 }

Reply via email to