This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new afe9c3c0a [FLINK-36150][pipeline-connector/mysql] tables.exclude 
should work even scan.binlog.newly-added-table.enabled is true
afe9c3c0a is described below

commit afe9c3c0adb77594565b24c004daa6385d50d5fc
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Aug 28 00:33:09 2024 +0800

    [FLINK-36150][pipeline-connector/mysql] tables.exclude should work even 
scan.binlog.newly-added-table.enabled is true
    
    This closes  #3573.
---
 .../mysql/factory/MySqlDataSourceFactory.java      |  25 +++--
 .../source/reader/MySqlPipelineRecordEmitter.java  |   4 +-
 .../mysql/source/MySqlDataSourceFactoryTest.java   |  22 ++++
 .../source/MysqlPipelineNewlyAddedTableITCase.java |  66 ++++++++++--
 .../connectors/mysql/debezium/DebeziumUtils.java   |   4 +-
 .../mysql/debezium/reader/BinlogSplitReader.java   |  12 +--
 .../cdc/connectors/mysql/schema/Selectors.java     | 119 +++++++++++++++++++++
 .../assigners/MySqlSnapshotSplitAssigner.java      |   6 +-
 .../mysql/source/config/MySqlSourceConfig.java     |  23 ++++
 .../source/config/MySqlSourceConfigFactory.java    |   7 ++
 .../mysql/source/reader/MySqlSourceReader.java     |   5 +-
 .../mysql/source/utils/TableDiscoveryUtils.java    |  19 ++--
 12 files changed, 274 insertions(+), 38 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 5f538eeb1..118e8cdb1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
 import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
-import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
 import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -43,6 +42,8 @@ import io.debezium.relational.Tables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.time.ZoneId;
 import java.util.HashMap;
@@ -171,12 +172,21 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                         .jdbcProperties(getJdbcProperties(configMap))
                         
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
 
+        List<TableId> tableIds = 
MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
+
+        if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) {
+            throw new IllegalArgumentException(
+                    "If both scan.binlog.newly-added-table.enabled and 
scan.newly-added-table.enabled are true, data maybe duplicate after restore");
+        }
+
         if (scanBinlogNewlyAddedTableEnabled) {
             String newTables = validateTableAndReturnDebeziumStyle(tables);
             configFactory.tableList(newTables);
+            configFactory.excludeTableList(tablesExclude);
+
         } else {
             Selectors selectors = new 
Selectors.SelectorsBuilder().includeTables(tables).build();
-            List<String> capturedTables = 
getTableList(configFactory.createConfig(0), selectors);
+            List<String> capturedTables = getTableList(tableIds, selectors);
             if (capturedTables.isEmpty()) {
                 throw new IllegalArgumentException(
                         "Cannot find any table by the option 'tables' = " + 
tables);
@@ -184,8 +194,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
             if (tablesExclude != null) {
                 Selectors selectExclude =
                         new 
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
-                List<String> excludeTables =
-                        getTableList(configFactory.createConfig(0), 
selectExclude);
+                List<String> excludeTables = getTableList(tableIds, 
selectExclude);
                 if (!excludeTables.isEmpty()) {
                     capturedTables.removeAll(excludeTables);
                 }
@@ -201,8 +210,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         String chunkKeyColumns = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         if (chunkKeyColumns != null) {
             Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();
-            List<TableId> tableIds =
-                    MySqlSchemaUtils.listTables(configFactory.createConfig(0), 
null);
+
             for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
                 String[] splits = chunkKeyColumn.split(":");
                 if (splits.length == 2) {
@@ -284,8 +292,9 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
     private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = 
"specific-offset";
     private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = 
"timestamp";
 
-    private static List<String> getTableList(MySqlSourceConfig sourceConfig, 
Selectors selectors) {
-        return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
+    private static List<String> getTableList(
+            @Nullable List<TableId> tableIdList, Selectors selectors) {
+        return tableIdList.stream()
                 .filter(selectors::isMatch)
                 .map(TableId::toString)
                 .collect(Collectors.toList());
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 909ed6c5b..4f801e0fa 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -237,7 +237,9 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
     private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig 
sourceConfig) {
         try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
             List<CreateTableEvent> createTableEventCache = new ArrayList<>();
-            List<TableId> capturedTableIds = listTables(jdbc, 
sourceConfig.getTableFilters());
+            List<TableId> capturedTableIds =
+                    listTables(
+                            jdbc, sourceConfig.getDatabaseFilter(), 
sourceConfig.getTableFilter());
             for (TableId tableId : capturedTableIds) {
                 Schema schema = getSchema(jdbc, tableId);
                 createTableEventCache.add(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index c7b442480..277bb74ac 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -39,10 +39,12 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -71,6 +73,26 @@ public class MySqlDataSourceFactoryTest extends 
MySqlSourceTestBase {
                 .isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + 
".products"));
     }
 
+    @Test
+    public void testCreateSourceScanBinlogNewlyAddedTableEnabled() {
+        inventoryDatabase.createAndInitialize();
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".prod\\.*");
+        options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        assertThatThrownBy(() -> factory.createDataSource(context))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "If both scan.binlog.newly-added-table.enabled and 
scan.newly-added-table.enabled are true, data maybe duplicate after restore");
+    }
+
     @Test
     public void testNoMatchedTable() {
         inventoryDatabase.createAndInitialize();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
index fcd80440d..87e44d240 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
@@ -60,6 +60,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -91,6 +92,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
@@ -157,12 +159,12 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
     public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
         List<String> tables = Collections.singletonList("address_\\.*");
         Map<String, String> options = new HashMap<>();
-        options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
         options.put(SCAN_STARTUP_MODE.key(), "timestamp");
         options.put(
                 SCAN_STARTUP_TIMESTAMP_MILLIS.key(), 
String.valueOf(System.currentTimeMillis()));
 
-        FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, 
options);
+        FlinkSourceProvider sourceProvider =
+                getFlinkSourceProvider(tables, 4, options, false, true);
         StreamExecutionEnvironment env =
                 StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
         env.enableCheckpointing(200);
@@ -194,6 +196,47 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
         assertThat(tableNames.get(1)).isEqualTo("address_shanghai");
     }
 
+    @Test
+    public void testScanBinlogNewlyAddedTableEnabledAndExcludeTables() throws 
Exception {
+        List<String> tables = Collections.singletonList("address_\\.*");
+        Map<String, String> options = new HashMap<>();
+        options.put(TABLES_EXCLUDE.key(), customDatabase.getDatabaseName() + 
".address_beijing");
+        options.put(SCAN_STARTUP_MODE.key(), "timestamp");
+        options.put(
+                SCAN_STARTUP_TIMESTAMP_MILLIS.key(), 
String.valueOf(System.currentTimeMillis()));
+
+        FlinkSourceProvider sourceProvider =
+                getFlinkSourceProvider(tables, 4, options, false, true);
+        StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+        env.enableCheckpointing(200);
+        DataStreamSource<Event> source =
+                env.fromSource(
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        MySqlDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(env.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(env, source, resultBuffer, serializer, 
accumulatorName);
+        env.executeAsync("AddNewlyTablesWhenReadingBinlog");
+        initialAddressTables(
+                getConnection(), Lists.newArrayList("address_beijing", 
"address_shanghai"));
+        List<Event> actual = fetchResults(iterator, 4);
+        List<String> tableNames =
+                actual.stream()
+                        .filter((event) -> event instanceof CreateTableEvent)
+                        .map((event) -> ((SchemaChangeEvent) 
event).tableId().getTableName())
+                        .collect(Collectors.toList());
+        assertThat(tableNames.size()).isEqualTo(1);
+        assertThat(tableNames.get(0)).isEqualTo("address_shanghai");
+    }
+
     @Test
     public void testAddNewTableOneByOneSingleParallelism() throws Exception {
         TestParam testParam =
@@ -273,7 +316,8 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
         List<String> listenTablesFirstRound = 
testParam.getFirstRoundListenTables();
 
         FlinkSourceProvider sourceProvider =
-                getFlinkSourceProvider(listenTablesFirstRound, parallelism, 
new HashMap<>());
+                getFlinkSourceProvider(
+                        listenTablesFirstRound, parallelism, new HashMap<>(), 
true, false);
         DataStreamSource<Event> source =
                 env.fromSource(
                         sourceProvider.getSource(),
@@ -317,7 +361,8 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
                 getStreamExecutionEnvironment(finishedSavePointPath, 
parallelism);
         List<String> listenTablesSecondRound = 
testParam.getSecondRoundListenTables();
         FlinkSourceProvider restoredSourceProvider =
-                getFlinkSourceProvider(listenTablesSecondRound, parallelism, 
new HashMap<>());
+                getFlinkSourceProvider(
+                        listenTablesSecondRound, parallelism, new HashMap<>(), 
true, false);
         DataStreamSource<Event> restoreSource =
                 restoredEnv.fromSource(
                         restoredSourceProvider.getSource(),
@@ -478,7 +523,11 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
     }
 
     private FlinkSourceProvider getFlinkSourceProvider(
-            List<String> tables, int parallelism, Map<String, String> 
additionalOptions) {
+            List<String> tables,
+            int parallelism,
+            Map<String, String> additionalOptions,
+            boolean enableScanNewlyAddedTable,
+            boolean enableBinlogScanNewlyAddedTable) {
         List<String> fullTableNames =
                 tables.stream()
                         .map(table -> customDatabase.getDatabaseName() + "." + 
table)
@@ -491,7 +540,12 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
         options.put(SERVER_TIME_ZONE.key(), "UTC");
         options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
         options.put(SERVER_ID.key(), getServerId(parallelism));
-        options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        if (enableScanNewlyAddedTable) {
+            options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        }
+        if (enableBinlogScanNewlyAddedTable) {
+            options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        }
         options.putAll(additionalOptions);
         Factory.Context context =
                 new FactoryHelper.DefaultContext(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
index ac3b20c4f..7ca60be5b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
@@ -193,7 +193,9 @@ public class DebeziumUtils {
 
         final List<TableId> capturedTableIds;
         try {
-            capturedTableIds = TableDiscoveryUtils.listTables(jdbc, 
sourceConfig.getTableFilters());
+            capturedTableIds =
+                    TableDiscoveryUtils.listTables(
+                            jdbc, sourceConfig.getDatabaseFilter(), 
sourceConfig.getTableFilter());
         } catch (SQLException e) {
             throw new FlinkRuntimeException("Failed to discover captured 
tables", e);
         }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index e1ef3895f..31173469b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -40,7 +40,6 @@ import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
 import io.debezium.pipeline.DataChangeEvent;
 import io.debezium.relational.TableId;
-import io.debezium.relational.Tables;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -81,7 +80,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
     // tableId -> the max splitHighWatermark
     private Map<TableId, BinlogOffset> maxSplitHighWatermarkMap;
     private final Set<TableId> pureBinlogPhaseTables;
-    private Tables.TableFilter capturedTableFilter;
+    private Predicate capturedTableFilter;
     private final StoppableChangeEventSourceContext changeEventSourceContext =
             new StoppableChangeEventSourceContext();
 
@@ -100,8 +99,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         this.currentBinlogSplit = mySqlSplit.asBinlogSplit();
         configureFilter();
         statefulTaskContext.configure(currentBinlogSplit);
-        this.capturedTableFilter =
-                
statefulTaskContext.getConnectorConfig().getTableFilters().dataCollectionFilter();
+        this.capturedTableFilter = 
statefulTaskContext.getSourceConfig().getTableFilter();
         this.queue = statefulTaskContext.getQueue();
         this.binlogSplitReadTask =
                 new MySqlBinlogSplitReadTask(
@@ -247,7 +245,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         } else if (RecordUtils.isSchemaChangeEvent(sourceRecord)) {
             if (RecordUtils.isTableChangeRecord(sourceRecord)) {
                 TableId tableId = RecordUtils.getTableId(sourceRecord);
-                return capturedTableFilter.isIncluded(tableId);
+                return capturedTableFilter.test(tableId);
             } else {
                 // Not related to changes in table structure, like 
`CREATE/DROP DATABASE`, skip it
                 return false;
@@ -270,7 +268,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         if 
(!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) {
             // the new added sharding table without history records
             return !maxSplitHighWatermarkMap.containsKey(tableId)
-                    && capturedTableFilter.isIncluded(tableId);
+                    && capturedTableFilter.test(tableId);
         }
         return false;
     }
@@ -280,7 +278,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
                 currentBinlogSplit.getFinishedSnapshotSplitInfos();
         Map<TableId, List<FinishedSnapshotSplitInfo>> splitsInfoMap = new 
HashMap<>();
         Map<TableId, BinlogOffset> tableIdBinlogPositionMap = new HashMap<>();
-        // specific offset mode
+        // startup mode which is stream only
         if (finishedSplitInfos.isEmpty()) {
             for (TableId tableId : 
currentBinlogSplit.getTableSchemas().keySet()) {
                 tableIdBinlogPositionMap.put(tableId, 
currentBinlogSplit.getStartingOffset());
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java
new file mode 100644
index 000000000..bf40b66d7
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.schema;
+
+import org.apache.flink.cdc.common.utils.Predicates;
+
+import io.debezium.relational.TableId;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Selectors for filtering tables. */
+public class Selectors {
+
+    private List<Selector> selectors;
+
+    private Selectors() {}
+
+    /**
+     * A {@link Selector} that determines whether a table identified by a 
given {@link TableId} is
+     * to be included.
+     */
+    private static class Selector {
+        private final Predicate<String> namespacePred;
+        private final Predicate<String> tableNamePred;
+
+        public Selector(String namespace, String tableName) {
+            this.namespacePred =
+                    namespace == null ? (namespacePred) -> false : 
Predicates.includes(namespace);
+            this.tableNamePred =
+                    tableName == null ? (tableNamePred) -> false : 
Predicates.includes(tableName);
+        }
+
+        public boolean isMatch(TableId tableId) {
+
+            String namespace = tableId.catalog();
+
+            if (namespace == null || namespace.isEmpty()) {
+                return tableNamePred.test(tableId.table());
+            }
+            return namespacePred.test(tableId.catalog()) && 
tableNamePred.test(tableId.table());
+        }
+    }
+
+    /** Match the {@link TableId} against the {@link Selector}s. * */
+    public boolean isMatch(TableId tableId) {
+        for (Selector selector : selectors) {
+            if (selector.isMatch(tableId)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /** Builder for {@link Selectors}. */
+    public static class SelectorsBuilder {
+
+        private List<Selector> selectors;
+
+        /**
+         * Current {@link TableId} used in mysql cdc connector will map 
database name to catalog.
+         *
+         * @param tableInclusions
+         * @return
+         */
+        public SelectorsBuilder includeTables(String tableInclusions) {
+
+            if (tableInclusions == null || tableInclusions.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Invalid table inclusion pattern cannot be null or 
empty");
+            }
+
+            List<Selector> selectors = new ArrayList<>();
+            Set<String> tableSplitSet =
+                    Predicates.setOf(
+                            tableInclusions, 
Predicates.RegExSplitterByComma::split, (str) -> str);
+            for (String tableSplit : tableSplitSet) {
+                List<String> tableIdList =
+                        Predicates.listOf(
+                                tableSplit, 
Predicates.RegExSplitterByDot::split, (str) -> str);
+                Iterator<String> iterator = tableIdList.iterator();
+                if (tableIdList.size() == 1) {
+                    selectors.add(new Selector(null, iterator.next()));
+                } else if (tableIdList.size() == 2) {
+                    selectors.add(new Selector(iterator.next(), 
iterator.next()));
+                } else {
+                    throw new IllegalArgumentException(
+                            "Invalid table inclusion pattern: " + 
tableInclusions);
+                }
+            }
+            this.selectors = selectors;
+            return this;
+        }
+
+        public Selectors build() {
+            Selectors selectors = new Selectors();
+            selectors.selectors = this.selectors;
+            return selectors;
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 9ea69b11a..89985ae2f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -601,11 +601,7 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
             return new MySqlChunkSplitter(
                     mySqlSchema,
                     sourceConfig,
-                    tableId != null
-                                    && sourceConfig
-                                            .getTableFilters()
-                                            .dataCollectionFilter()
-                                            .isIncluded(tableId)
+                    tableId != null && 
sourceConfig.getTableFilter().test(tableId)
                             ? chunkSplitterState
                             : ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
         }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 2e19156e5..dd0ac7896 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.connectors.mysql.source.config;
 
+import org.apache.flink.cdc.connectors.mysql.schema.Selectors;
 import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -24,6 +25,7 @@ import org.apache.flink.table.catalog.ObjectPath;
 import io.debezium.config.Configuration;
 import io.debezium.connector.mysql.MySqlConnectorConfig;
 import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
 
 import javax.annotation.Nullable;
 
@@ -32,6 +34,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Predicate;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -45,6 +48,7 @@ public class MySqlSourceConfig implements Serializable {
     private final String password;
     private final List<String> databaseList;
     private final List<String> tableList;
+    private final String excludeTableList;
     @Nullable private final ServerIdRange serverIdRange;
     private final StartupOptions startupOptions;
     private final int splitSize;
@@ -77,6 +81,7 @@ public class MySqlSourceConfig implements Serializable {
             String password,
             List<String> databaseList,
             List<String> tableList,
+            @Nullable String excludeTableList,
             @Nullable ServerIdRange serverIdRange,
             StartupOptions startupOptions,
             int splitSize,
@@ -101,6 +106,7 @@ public class MySqlSourceConfig implements Serializable {
         this.password = password;
         this.databaseList = checkNotNull(databaseList);
         this.tableList = checkNotNull(tableList);
+        this.excludeTableList = excludeTableList;
         this.serverIdRange = serverIdRange;
         this.startupOptions = checkNotNull(startupOptions);
         this.splitSize = splitSize;
@@ -216,10 +222,27 @@ public class MySqlSourceConfig implements Serializable {
         return dbzMySqlConfig;
     }
 
+    @Deprecated
     public RelationalTableFilters getTableFilters() {
         return dbzMySqlConfig.getTableFilters();
     }
 
+    public Predicate<String> getDatabaseFilter() {
+        RelationalTableFilters tableFilters = dbzMySqlConfig.getTableFilters();
+        return (String databaseName) -> 
tableFilters.databaseFilter().test(databaseName);
+    }
+
+    public Predicate<TableId> getTableFilter() {
+        RelationalTableFilters tableFilters = dbzMySqlConfig.getTableFilters();
+        Selectors excludeTableFilter =
+                (excludeTableList == null
+                        ? null
+                        : new 
Selectors.SelectorsBuilder().includeTables(excludeTableList).build());
+        return (TableId tableId) ->
+                tableFilters.dataCollectionFilter().isIncluded(tableId)
+                        && (excludeTableFilter == null || 
!excludeTableFilter.isMatch(tableId));
+    }
+
     public Properties getJdbcProperties() {
         return jdbcProperties;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index c994c8241..8b65055ca 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -49,6 +49,7 @@ public class MySqlSourceConfigFactory implements Serializable 
{
     private ServerIdRange serverIdRange;
     private List<String> databaseList;
     private List<String> tableList;
+    private String excludeTableList;
     private String serverTimeZone = ZoneId.systemDefault().getId();
     private StartupOptions startupOptions = StartupOptions.initial();
     private int splitSize = 
MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue();
@@ -102,6 +103,11 @@ public class MySqlSourceConfigFactory implements 
Serializable {
         return this;
     }
 
+    public MySqlSourceConfigFactory excludeTableList(String tableInclusions) {
+        this.excludeTableList = tableInclusions;
+        return this;
+    }
+
     /** Name of the MySQL database to use when connecting to the MySQL 
database server. */
     public MySqlSourceConfigFactory username(String username) {
         this.username = username;
@@ -360,6 +366,7 @@ public class MySqlSourceConfigFactory implements 
Serializable {
                 password,
                 databaseList,
                 tableList,
+                excludeTableList,
                 serverIdRange,
                 startupOptions,
                 splitSize,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
index ae3084363..4ffea7ce0 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
@@ -235,10 +235,7 @@ public class MySqlSourceReader<T>
             LOG.info("Source reader {} adds split {}", subtaskId, split);
             if (split.isSnapshotSplit()) {
                 MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
-                if (sourceConfig
-                        .getTableFilters()
-                        .dataCollectionFilter()
-                        .isIncluded(split.asSnapshotSplit().getTableId())) {
+                if 
(sourceConfig.getTableFilter().test(split.asSnapshotSplit().getTableId())) {
                     if (snapshotSplit.isSnapshotReadFinished()) {
                         finishedUnackedSplits.put(snapshotSplit.splitId(), 
snapshotSplit);
                     } else {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
index c7b74b654..dedeab2dd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import io.debezium.connector.mysql.MySqlConnection;
 import io.debezium.connector.mysql.MySqlPartition;
 import io.debezium.jdbc.JdbcConnection;
-import io.debezium.relational.RelationalTableFilters;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges.TableChange;
 import org.slf4j.Logger;
@@ -35,6 +34,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /** Utilities to discovery matched tables. */
@@ -42,7 +42,8 @@ public class TableDiscoveryUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TableDiscoveryUtils.class);
 
-    public static List<TableId> listTables(JdbcConnection jdbc, 
RelationalTableFilters tableFilters)
+    public static List<TableId> listTables(
+            JdbcConnection jdbc, Predicate<String> databaseFilter, 
Predicate<TableId> tableFilter)
             throws SQLException {
         final List<TableId> capturedTableIds = new ArrayList<>();
         // -------------------
@@ -57,7 +58,7 @@ public class TableDiscoveryUtils {
                 rs -> {
                     while (rs.next()) {
                         String databaseName = rs.getString(1);
-                        if (tableFilters.databaseFilter().test(databaseName)) {
+                        if (databaseFilter.test(databaseName)) {
                             databaseNames.add(databaseName);
                         }
                     }
@@ -81,7 +82,7 @@ public class TableDiscoveryUtils {
                         rs -> {
                             while (rs.next()) {
                                 TableId tableId = new TableId(dbName, null, 
rs.getString(1));
-                                if 
(tableFilters.dataCollectionFilter().isIncluded(tableId)) {
+                                if (tableFilter.test(tableId)) {
                                     capturedTableIds.add(tableId);
                                     LOG.info(
                                             "\t including table '{}' for 
further processing",
@@ -106,7 +107,9 @@ public class TableDiscoveryUtils {
             MySqlPartition partition, MySqlSourceConfig sourceConfig, 
MySqlConnection jdbc) {
         final List<TableId> capturedTableIds;
         try {
-            capturedTableIds = listTables(jdbc, 
sourceConfig.getTableFilters());
+            capturedTableIds =
+                    listTables(
+                            jdbc, sourceConfig.getDatabaseFilter(), 
sourceConfig.getTableFilter());
         } catch (SQLException e) {
             throw new FlinkRuntimeException("Failed to discover captured 
tables", e);
         }
@@ -121,7 +124,11 @@ public class TableDiscoveryUtils {
         final List<TableId> capturedTableIds;
         try {
             capturedTableIds =
-                    listTables(jdbc, sourceConfig.getTableFilters()).stream()
+                    listTables(
+                                    jdbc,
+                                    sourceConfig.getDatabaseFilter(),
+                                    sourceConfig.getTableFilter())
+                            .stream()
                             .filter(tableId -> 
!existedTables.contains(tableId))
                             .collect(Collectors.toList());
         } catch (SQLException e) {


Reply via email to