This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new afe9c3c0a [FLINK-36150][pipeline-connector/mysql] tables.exclude
should work even scan.binlog.newly-added-table.enabled is true
afe9c3c0a is described below
commit afe9c3c0adb77594565b24c004daa6385d50d5fc
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Aug 28 00:33:09 2024 +0800
[FLINK-36150][pipeline-connector/mysql] tables.exclude should work even
scan.binlog.newly-added-table.enabled is true
This closes #3573.
---
.../mysql/factory/MySqlDataSourceFactory.java | 25 +++--
.../source/reader/MySqlPipelineRecordEmitter.java | 4 +-
.../mysql/source/MySqlDataSourceFactoryTest.java | 22 ++++
.../source/MysqlPipelineNewlyAddedTableITCase.java | 66 ++++++++++--
.../connectors/mysql/debezium/DebeziumUtils.java | 4 +-
.../mysql/debezium/reader/BinlogSplitReader.java | 12 +--
.../cdc/connectors/mysql/schema/Selectors.java | 119 +++++++++++++++++++++
.../assigners/MySqlSnapshotSplitAssigner.java | 6 +-
.../mysql/source/config/MySqlSourceConfig.java | 23 ++++
.../source/config/MySqlSourceConfigFactory.java | 7 ++
.../mysql/source/reader/MySqlSourceReader.java | 5 +-
.../mysql/source/utils/TableDiscoveryUtils.java | 19 ++--
12 files changed, 274 insertions(+), 38 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 5f538eeb1..118e8cdb1 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
-import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -43,6 +42,8 @@ import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashMap;
@@ -171,12 +172,21 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
+ List<TableId> tableIds =
MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
+
+ if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) {
+ throw new IllegalArgumentException(
+ "If both scan.binlog.newly-added-table.enabled and
scan.newly-added-table.enabled are true, data maybe duplicate after restore");
+ }
+
if (scanBinlogNewlyAddedTableEnabled) {
String newTables = validateTableAndReturnDebeziumStyle(tables);
configFactory.tableList(newTables);
+ configFactory.excludeTableList(tablesExclude);
+
} else {
Selectors selectors = new
Selectors.SelectorsBuilder().includeTables(tables).build();
- List<String> capturedTables =
getTableList(configFactory.createConfig(0), selectors);
+ List<String> capturedTables = getTableList(tableIds, selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " +
tables);
@@ -184,8 +194,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
if (tablesExclude != null) {
Selectors selectExclude =
new
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
- List<String> excludeTables =
- getTableList(configFactory.createConfig(0),
selectExclude);
+ List<String> excludeTables = getTableList(tableIds,
selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
@@ -201,8 +210,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
String chunkKeyColumns =
config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
if (chunkKeyColumns != null) {
Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();
- List<TableId> tableIds =
- MySqlSchemaUtils.listTables(configFactory.createConfig(0),
null);
+
for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
String[] splits = chunkKeyColumn.split(":");
if (splits.length == 2) {
@@ -284,8 +292,9 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET =
"specific-offset";
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP =
"timestamp";
- private static List<String> getTableList(MySqlSourceConfig sourceConfig,
Selectors selectors) {
- return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
+ private static List<String> getTableList(
+ @Nullable List<TableId> tableIdList, Selectors selectors) {
+ return tableIdList.stream()
.filter(selectors::isMatch)
.map(TableId::toString)
.collect(Collectors.toList());
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 909ed6c5b..4f801e0fa 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -237,7 +237,9 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig
sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
List<CreateTableEvent> createTableEventCache = new ArrayList<>();
- List<TableId> capturedTableIds = listTables(jdbc,
sourceConfig.getTableFilters());
+ List<TableId> capturedTableIds =
+ listTables(
+ jdbc, sourceConfig.getDatabaseFilter(),
sourceConfig.getTableFilter());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
createTableEventCache.add(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index c7b442480..277bb74ac 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -39,10 +39,12 @@ import java.util.stream.Collectors;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
+import static
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
import static org.assertj.core.api.Assertions.assertThat;
@@ -71,6 +73,26 @@ public class MySqlDataSourceFactoryTest extends
MySqlSourceTestBase {
.isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() +
".products"));
}
+ @Test
+ public void testCreateSourceScanBinlogNewlyAddedTableEnabled() {
+ inventoryDatabase.createAndInitialize();
+ Map<String, String> options = new HashMap<>();
+ options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+ options.put(PORT.key(),
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+ options.put(USERNAME.key(), TEST_USER);
+ options.put(PASSWORD.key(), TEST_PASSWORD);
+ options.put(TABLES.key(), inventoryDatabase.getDatabaseName() +
".prod\\.*");
+ options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+ options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+ Factory.Context context = new
MockContext(Configuration.fromMap(options));
+
+ MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+ assertThatThrownBy(() -> factory.createDataSource(context))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "If both scan.binlog.newly-added-table.enabled and
scan.newly-added-table.enabled are true, data maybe duplicate after restore");
+ }
+
@Test
public void testNoMatchedTable() {
inventoryDatabase.createAndInitialize();
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
index fcd80440d..87e44d240 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
@@ -60,6 +60,7 @@ import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -91,6 +92,7 @@ import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
@@ -157,12 +159,12 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
List<String> tables = Collections.singletonList("address_\\.*");
Map<String, String> options = new HashMap<>();
- options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(SCAN_STARTUP_MODE.key(), "timestamp");
options.put(
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis()));
- FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4,
options);
+ FlinkSourceProvider sourceProvider =
+ getFlinkSourceProvider(tables, 4, options, false, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
env.enableCheckpointing(200);
@@ -194,6 +196,47 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
assertThat(tableNames.get(1)).isEqualTo("address_shanghai");
}
+ @Test
+ public void testScanBinlogNewlyAddedTableEnabledAndExcludeTables() throws
Exception {
+ List<String> tables = Collections.singletonList("address_\\.*");
+ Map<String, String> options = new HashMap<>();
+ options.put(TABLES_EXCLUDE.key(), customDatabase.getDatabaseName() +
".address_beijing");
+ options.put(SCAN_STARTUP_MODE.key(), "timestamp");
+ options.put(
+ SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis()));
+
+ FlinkSourceProvider sourceProvider =
+ getFlinkSourceProvider(tables, 4, options, false, true);
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ env.enableCheckpointing(200);
+ DataStreamSource<Event> source =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> serializer =
+
source.getTransformation().getOutputType().createSerializer(env.getConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(env, source, resultBuffer, serializer,
accumulatorName);
+ env.executeAsync("AddNewlyTablesWhenReadingBinlog");
+ initialAddressTables(
+ getConnection(), Lists.newArrayList("address_beijing",
"address_shanghai"));
+ List<Event> actual = fetchResults(iterator, 4);
+ List<String> tableNames =
+ actual.stream()
+ .filter((event) -> event instanceof CreateTableEvent)
+ .map((event) -> ((SchemaChangeEvent)
event).tableId().getTableName())
+ .collect(Collectors.toList());
+ assertThat(tableNames.size()).isEqualTo(1);
+ assertThat(tableNames.get(0)).isEqualTo("address_shanghai");
+ }
+
@Test
public void testAddNewTableOneByOneSingleParallelism() throws Exception {
TestParam testParam =
@@ -273,7 +316,8 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
List<String> listenTablesFirstRound =
testParam.getFirstRoundListenTables();
FlinkSourceProvider sourceProvider =
- getFlinkSourceProvider(listenTablesFirstRound, parallelism,
new HashMap<>());
+ getFlinkSourceProvider(
+ listenTablesFirstRound, parallelism, new HashMap<>(),
true, false);
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
@@ -317,7 +361,8 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
getStreamExecutionEnvironment(finishedSavePointPath,
parallelism);
List<String> listenTablesSecondRound =
testParam.getSecondRoundListenTables();
FlinkSourceProvider restoredSourceProvider =
- getFlinkSourceProvider(listenTablesSecondRound, parallelism,
new HashMap<>());
+ getFlinkSourceProvider(
+ listenTablesSecondRound, parallelism, new HashMap<>(),
true, false);
DataStreamSource<Event> restoreSource =
restoredEnv.fromSource(
restoredSourceProvider.getSource(),
@@ -478,7 +523,11 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
}
private FlinkSourceProvider getFlinkSourceProvider(
- List<String> tables, int parallelism, Map<String, String>
additionalOptions) {
+ List<String> tables,
+ int parallelism,
+ Map<String, String> additionalOptions,
+ boolean enableScanNewlyAddedTable,
+ boolean enableBinlogScanNewlyAddedTable) {
List<String> fullTableNames =
tables.stream()
.map(table -> customDatabase.getDatabaseName() + "." +
table)
@@ -491,7 +540,12 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
options.put(SERVER_TIME_ZONE.key(), "UTC");
options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
options.put(SERVER_ID.key(), getServerId(parallelism));
- options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+ if (enableScanNewlyAddedTable) {
+ options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+ }
+ if (enableBinlogScanNewlyAddedTable) {
+ options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+ }
options.putAll(additionalOptions);
Factory.Context context =
new FactoryHelper.DefaultContext(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
index ac3b20c4f..7ca60be5b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
@@ -193,7 +193,9 @@ public class DebeziumUtils {
final List<TableId> capturedTableIds;
try {
- capturedTableIds = TableDiscoveryUtils.listTables(jdbc,
sourceConfig.getTableFilters());
+ capturedTableIds =
+ TableDiscoveryUtils.listTables(
+ jdbc, sourceConfig.getDatabaseFilter(),
sourceConfig.getTableFilter());
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured
tables", e);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index e1ef3895f..31173469b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -40,7 +40,6 @@ import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
-import io.debezium.relational.Tables;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -81,7 +80,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
// tableId -> the max splitHighWatermark
private Map<TableId, BinlogOffset> maxSplitHighWatermarkMap;
private final Set<TableId> pureBinlogPhaseTables;
- private Tables.TableFilter capturedTableFilter;
+ private Predicate capturedTableFilter;
private final StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
@@ -100,8 +99,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
this.currentBinlogSplit = mySqlSplit.asBinlogSplit();
configureFilter();
statefulTaskContext.configure(currentBinlogSplit);
- this.capturedTableFilter =
-
statefulTaskContext.getConnectorConfig().getTableFilters().dataCollectionFilter();
+ this.capturedTableFilter =
statefulTaskContext.getSourceConfig().getTableFilter();
this.queue = statefulTaskContext.getQueue();
this.binlogSplitReadTask =
new MySqlBinlogSplitReadTask(
@@ -247,7 +245,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
} else if (RecordUtils.isSchemaChangeEvent(sourceRecord)) {
if (RecordUtils.isTableChangeRecord(sourceRecord)) {
TableId tableId = RecordUtils.getTableId(sourceRecord);
- return capturedTableFilter.isIncluded(tableId);
+ return capturedTableFilter.test(tableId);
} else {
// Not related to changes in table structure, like
`CREATE/DROP DATABASE`, skip it
return false;
@@ -270,7 +268,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
if
(!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) {
// the new added sharding table without history records
return !maxSplitHighWatermarkMap.containsKey(tableId)
- && capturedTableFilter.isIncluded(tableId);
+ && capturedTableFilter.test(tableId);
}
return false;
}
@@ -280,7 +278,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
currentBinlogSplit.getFinishedSnapshotSplitInfos();
Map<TableId, List<FinishedSnapshotSplitInfo>> splitsInfoMap = new
HashMap<>();
Map<TableId, BinlogOffset> tableIdBinlogPositionMap = new HashMap<>();
- // specific offset mode
+ // startup mode which is stream only
if (finishedSplitInfos.isEmpty()) {
for (TableId tableId :
currentBinlogSplit.getTableSchemas().keySet()) {
tableIdBinlogPositionMap.put(tableId,
currentBinlogSplit.getStartingOffset());
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java
new file mode 100644
index 000000000..bf40b66d7
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.schema;
+
+import org.apache.flink.cdc.common.utils.Predicates;
+
+import io.debezium.relational.TableId;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Selectors for filtering tables. */
+public class Selectors {
+
+ private List<Selector> selectors;
+
+ private Selectors() {}
+
+ /**
+ * A {@link Selector} that determines whether a table identified by a
given {@link TableId} is
+ * to be included.
+ */
+ private static class Selector {
+ private final Predicate<String> namespacePred;
+ private final Predicate<String> tableNamePred;
+
+ public Selector(String namespace, String tableName) {
+ this.namespacePred =
+ namespace == null ? (namespacePred) -> false :
Predicates.includes(namespace);
+ this.tableNamePred =
+ tableName == null ? (tableNamePred) -> false :
Predicates.includes(tableName);
+ }
+
+ public boolean isMatch(TableId tableId) {
+
+ String namespace = tableId.catalog();
+
+ if (namespace == null || namespace.isEmpty()) {
+ return tableNamePred.test(tableId.table());
+ }
+ return namespacePred.test(tableId.catalog()) &&
tableNamePred.test(tableId.table());
+ }
+ }
+
+ /** Match the {@link TableId} against the {@link Selector}s. * */
+ public boolean isMatch(TableId tableId) {
+ for (Selector selector : selectors) {
+ if (selector.isMatch(tableId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Builder for {@link Selectors}. */
+ public static class SelectorsBuilder {
+
+ private List<Selector> selectors;
+
+ /**
+ * Current {@link TableId} used in mysql cdc connector will map
database name to catalog.
+ *
+ * @param tableInclusions
+ * @return
+ */
+ public SelectorsBuilder includeTables(String tableInclusions) {
+
+ if (tableInclusions == null || tableInclusions.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid table inclusion pattern cannot be null or
empty");
+ }
+
+ List<Selector> selectors = new ArrayList<>();
+ Set<String> tableSplitSet =
+ Predicates.setOf(
+ tableInclusions,
Predicates.RegExSplitterByComma::split, (str) -> str);
+ for (String tableSplit : tableSplitSet) {
+ List<String> tableIdList =
+ Predicates.listOf(
+ tableSplit,
Predicates.RegExSplitterByDot::split, (str) -> str);
+ Iterator<String> iterator = tableIdList.iterator();
+ if (tableIdList.size() == 1) {
+ selectors.add(new Selector(null, iterator.next()));
+ } else if (tableIdList.size() == 2) {
+ selectors.add(new Selector(iterator.next(),
iterator.next()));
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid table inclusion pattern: " +
tableInclusions);
+ }
+ }
+ this.selectors = selectors;
+ return this;
+ }
+
+ public Selectors build() {
+ Selectors selectors = new Selectors();
+ selectors.selectors = this.selectors;
+ return selectors;
+ }
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 9ea69b11a..89985ae2f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -601,11 +601,7 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
return new MySqlChunkSplitter(
mySqlSchema,
sourceConfig,
- tableId != null
- && sourceConfig
- .getTableFilters()
- .dataCollectionFilter()
- .isIncluded(tableId)
+ tableId != null &&
sourceConfig.getTableFilter().test(tableId)
? chunkSplitterState
: ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 2e19156e5..dd0ac7896 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql.source.config;
+import org.apache.flink.cdc.connectors.mysql.schema.Selectors;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.table.catalog.ObjectPath;
@@ -24,6 +25,7 @@ import org.apache.flink.table.catalog.ObjectPath;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
import javax.annotation.Nullable;
@@ -32,6 +34,7 @@ import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.function.Predicate;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -45,6 +48,7 @@ public class MySqlSourceConfig implements Serializable {
private final String password;
private final List<String> databaseList;
private final List<String> tableList;
+ private final String excludeTableList;
@Nullable private final ServerIdRange serverIdRange;
private final StartupOptions startupOptions;
private final int splitSize;
@@ -77,6 +81,7 @@ public class MySqlSourceConfig implements Serializable {
String password,
List<String> databaseList,
List<String> tableList,
+ @Nullable String excludeTableList,
@Nullable ServerIdRange serverIdRange,
StartupOptions startupOptions,
int splitSize,
@@ -101,6 +106,7 @@ public class MySqlSourceConfig implements Serializable {
this.password = password;
this.databaseList = checkNotNull(databaseList);
this.tableList = checkNotNull(tableList);
+ this.excludeTableList = excludeTableList;
this.serverIdRange = serverIdRange;
this.startupOptions = checkNotNull(startupOptions);
this.splitSize = splitSize;
@@ -216,10 +222,27 @@ public class MySqlSourceConfig implements Serializable {
return dbzMySqlConfig;
}
+ @Deprecated
public RelationalTableFilters getTableFilters() {
return dbzMySqlConfig.getTableFilters();
}
+ public Predicate<String> getDatabaseFilter() {
+ RelationalTableFilters tableFilters = dbzMySqlConfig.getTableFilters();
+ return (String databaseName) ->
tableFilters.databaseFilter().test(databaseName);
+ }
+
+ public Predicate<TableId> getTableFilter() {
+ RelationalTableFilters tableFilters = dbzMySqlConfig.getTableFilters();
+ Selectors excludeTableFilter =
+ (excludeTableList == null
+ ? null
+ : new
Selectors.SelectorsBuilder().includeTables(excludeTableList).build());
+ return (TableId tableId) ->
+ tableFilters.dataCollectionFilter().isIncluded(tableId)
+ && (excludeTableFilter == null ||
!excludeTableFilter.isMatch(tableId));
+ }
+
public Properties getJdbcProperties() {
return jdbcProperties;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index c994c8241..8b65055ca 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -49,6 +49,7 @@ public class MySqlSourceConfigFactory implements Serializable
{
private ServerIdRange serverIdRange;
private List<String> databaseList;
private List<String> tableList;
+ private String excludeTableList;
private String serverTimeZone = ZoneId.systemDefault().getId();
private StartupOptions startupOptions = StartupOptions.initial();
private int splitSize =
MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue();
@@ -102,6 +103,11 @@ public class MySqlSourceConfigFactory implements
Serializable {
return this;
}
+ public MySqlSourceConfigFactory excludeTableList(String tableInclusions) {
+ this.excludeTableList = tableInclusions;
+ return this;
+ }
+
/** Name of the MySQL database to use when connecting to the MySQL
database server. */
public MySqlSourceConfigFactory username(String username) {
this.username = username;
@@ -360,6 +366,7 @@ public class MySqlSourceConfigFactory implements
Serializable {
password,
databaseList,
tableList,
+ excludeTableList,
serverIdRange,
startupOptions,
splitSize,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
index ae3084363..4ffea7ce0 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
@@ -235,10 +235,7 @@ public class MySqlSourceReader<T>
LOG.info("Source reader {} adds split {}", subtaskId, split);
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
- if (sourceConfig
- .getTableFilters()
- .dataCollectionFilter()
- .isIncluded(split.asSnapshotSplit().getTableId())) {
+ if
(sourceConfig.getTableFilter().test(split.asSnapshotSplit().getTableId())) {
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(),
snapshotSplit);
} else {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
index c7b74b654..dedeab2dd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
-import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger;
@@ -35,6 +34,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/** Utilities to discovery matched tables. */
@@ -42,7 +42,8 @@ public class TableDiscoveryUtils {
private static final Logger LOG =
LoggerFactory.getLogger(TableDiscoveryUtils.class);
- public static List<TableId> listTables(JdbcConnection jdbc,
RelationalTableFilters tableFilters)
+ public static List<TableId> listTables(
+ JdbcConnection jdbc, Predicate<String> databaseFilter,
Predicate<TableId> tableFilter)
throws SQLException {
final List<TableId> capturedTableIds = new ArrayList<>();
// -------------------
@@ -57,7 +58,7 @@ public class TableDiscoveryUtils {
rs -> {
while (rs.next()) {
String databaseName = rs.getString(1);
- if (tableFilters.databaseFilter().test(databaseName)) {
+ if (databaseFilter.test(databaseName)) {
databaseNames.add(databaseName);
}
}
@@ -81,7 +82,7 @@ public class TableDiscoveryUtils {
rs -> {
while (rs.next()) {
TableId tableId = new TableId(dbName, null,
rs.getString(1));
- if
(tableFilters.dataCollectionFilter().isIncluded(tableId)) {
+ if (tableFilter.test(tableId)) {
capturedTableIds.add(tableId);
LOG.info(
"\t including table '{}' for
further processing",
@@ -106,7 +107,9 @@ public class TableDiscoveryUtils {
MySqlPartition partition, MySqlSourceConfig sourceConfig,
MySqlConnection jdbc) {
final List<TableId> capturedTableIds;
try {
- capturedTableIds = listTables(jdbc,
sourceConfig.getTableFilters());
+ capturedTableIds =
+ listTables(
+ jdbc, sourceConfig.getDatabaseFilter(),
sourceConfig.getTableFilter());
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured
tables", e);
}
@@ -121,7 +124,11 @@ public class TableDiscoveryUtils {
final List<TableId> capturedTableIds;
try {
capturedTableIds =
- listTables(jdbc, sourceConfig.getTableFilters()).stream()
+ listTables(
+ jdbc,
+ sourceConfig.getDatabaseFilter(),
+ sourceConfig.getTableFilter())
+ .stream()
.filter(tableId ->
!existedTables.contains(tableId))
.collect(Collectors.toList());
} catch (SQLException e) {