This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 565032e3a [FLINK-36115][pipeline-connector][mysql] Introduce
scan.newly-added-table.enabled option for MySQL Source
565032e3a is described below
commit 565032e3af66677cffd8a797de9af7c9f4490f57
Author: Kunni <[email protected]>
AuthorDate: Thu Aug 22 12:39:16 2024 +0800
[FLINK-36115][pipeline-connector][mysql] Introduce
scan.newly-added-table.enabled option for MySQL Source
This closes #3560.
---
.../docs/connectors/pipeline-connectors/mysql.md | 11 ++++
.../docs/connectors/pipeline-connectors/mysql.md | 11 ++++
.../mysql/factory/MySqlDataSourceFactory.java | 71 +++++++++++++++++-----
.../mysql/source/MySqlDataSourceOptions.java | 11 ++++
.../source/MysqlPipelineNewlyAddedTableITCase.java | 46 +++++++++++++-
5 files changed, 131 insertions(+), 19 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index 636ed975f..4d5edb66f 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -275,6 +275,17 @@ pipeline:
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
+ <tr>
+ <td>scan.binlog.newly-added-table.enabled</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。 <br>
+ scan.newly-added-table.enabled 和
scan.binlog.newly-added-table.enabled 参数的不同在于: <br>
+ scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取; <br>
+ scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md
b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 879701614..36c4cc770 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -282,6 +282,17 @@ pipeline:
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by
default is false. This option is only useful when we start the job from a
savepoint/checkpoint.</td>
</tr>
+ <tr>
+ <td>scan.binlog.newly-added-table.enabled</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>In binlog reading stage, whether to scan the ddl and dml statements
of newly added tables or not, by default is false. <br>
+ The difference between scan.newly-added-table.enabled and
scan.binlog.newly-added-table.enabled options is: <br>
+ scan.newly-added-table.enabled: do re-snapshot & binlog-reading for
newly added table when restored; <br>
+ scan.binlog.newly-added-table.enabled: only do binlog-reading for
newly added table during binlog reading phase.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 7f7691961..5f538eeb1 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
@@ -38,6 +39,7 @@ import
org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;
+import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +63,7 @@ import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
@@ -128,6 +131,8 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled =
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+ boolean scanBinlogNewlyAddedTableEnabled =
+ config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize,
1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -166,26 +171,32 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
- Selectors selectors = new
Selectors.SelectorsBuilder().includeTables(tables).build();
- List<String> capturedTables =
getTableList(configFactory.createConfig(0), selectors);
- if (capturedTables.isEmpty()) {
- throw new IllegalArgumentException(
- "Cannot find any table by the option 'tables' = " +
tables);
- }
- if (tablesExclude != null) {
- Selectors selectExclude =
- new
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
- List<String> excludeTables =
getTableList(configFactory.createConfig(0), selectExclude);
- if (!excludeTables.isEmpty()) {
- capturedTables.removeAll(excludeTables);
- }
+ if (scanBinlogNewlyAddedTableEnabled) {
+ String newTables = validateTableAndReturnDebeziumStyle(tables);
+ configFactory.tableList(newTables);
+ } else {
+ Selectors selectors = new
Selectors.SelectorsBuilder().includeTables(tables).build();
+ List<String> capturedTables =
getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
- "Cannot find any table with by the option
'tables.exclude' = "
- + tablesExclude);
+ "Cannot find any table by the option 'tables' = " +
tables);
}
+ if (tablesExclude != null) {
+ Selectors selectExclude =
+ new
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
+ List<String> excludeTables =
+ getTableList(configFactory.createConfig(0),
selectExclude);
+ if (!excludeTables.isEmpty()) {
+ capturedTables.removeAll(excludeTables);
+ }
+ if (capturedTables.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Cannot find any table with by the option
'tables.exclude' = "
+ + tablesExclude);
+ }
+ }
+ configFactory.tableList(capturedTables.toArray(new String[0]));
}
- configFactory.tableList(capturedTables.toArray(new String[0]));
String chunkKeyColumns =
config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
if (chunkKeyColumns != null) {
@@ -256,6 +267,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
return options;
}
@@ -410,6 +422,33 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
distributionFactorLower));
}
+ /**
+ * Currently, The supported regular syntax is not exactly the same in
{@link Selectors} and
+ * {@link Tables.TableFilter}.
+ *
+ * <p>The main distinction are :
+ *
+ * <p>1) {@link Selectors} use `,` to split table names and {@link
Tables.TableFilter} use use
+ * `|` to split table names.
+ *
+ * <p>2) If there is a need to use a dot (.) in a regular expression to
match any character, it
+ * is necessary to escape the dot with a backslash, refer to {@link
+ * MySqlDataSourceOptions#TABLES}.
+ */
+ private String validateTableAndReturnDebeziumStyle(String tables) {
+ // MySQL table names are not allowed to have `,` character.
+ if (tables.contains(",")) {
+ throw new IllegalArgumentException(
+ "the `,` in "
+ + tables
+ + " is not supported when "
+ + SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED
+ + " was enabled.");
+ }
+
+ return tables.replace("\\.", ".");
+ }
+
/** Replaces the default timezone placeholder with session timezone, if
applicable. */
private static ZoneId getServerTimeZone(Configuration config) {
final String serverTimeZone = config.get(SERVER_TIME_ZONE);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 9a18350b3..580d370b5 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -261,4 +261,15 @@ public class MySqlDataSourceOptions {
+ "If there is a need to use a dot (.) in
a regular expression to match any character, "
+ "it is necessary to escape the dot with
a backslash."
+ "eg. db0.\\.*, db1.user_table_[0-9]+,
db[1-2].[app|web]_order_\\.*");
+
+ @Experimental
+ public static final ConfigOption<Boolean>
SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED =
+ ConfigOptions.key("scan.binlog.newly-added-table.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "In binlog reading stage, whether to scan the ddl
and dml statements of newly added tables or not, by default is false. \n"
+ + "The difference between
scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled
options is: \n"
+ + "scan.newly-added-table.enabled: do
re-snapshot & binlog-reading for newly added table when restored; \n"
+ + "scan.binlog.newly-added-table.enabled:
only do binlog-reading for newly added table during binlog reading phase.");
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
index 7187e6447..4cc1e952a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
@@ -21,6 +21,7 @@ import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
@@ -83,7 +84,10 @@ import static java.lang.String.format;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
@@ -149,6 +153,40 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
return DebeziumUtils.createMySqlConnection(configuration, new
Properties());
}
+ @Test
+ public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
+ List<String> tables = Collections.singletonList("address_\\.*");
+ Map<String, String> options = new HashMap<>();
+ options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+ options.put(SCAN_STARTUP_MODE.key(), "timestamp");
+ options.put(
+ SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis()));
+
+ FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4,
options);
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ env.enableCheckpointing(200);
+ DataStreamSource<Event> source =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> serializer =
+
source.getTransformation().getOutputType().createSerializer(env.getConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(env, source, resultBuffer, serializer,
accumulatorName);
+ env.executeAsync("AddNewlyTablesWhenReadingBinlog");
+ initialAddressTables(getConnection(),
Collections.singletonList("address_beijing"));
+ List<Event> actual = fetchResults(iterator, 4);
+ assertThat(((ChangeEvent) actual.get(0)).tableId())
+ .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(),
"address_beijing"));
+ }
+
@Test
public void testAddNewTableOneByOneSingleParallelism() throws Exception {
TestParam testParam =
@@ -228,7 +266,7 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
List<String> listenTablesFirstRound =
testParam.getFirstRoundListenTables();
FlinkSourceProvider sourceProvider =
- getFlinkSourceProvider(listenTablesFirstRound, parallelism);
+ getFlinkSourceProvider(listenTablesFirstRound, parallelism,
new HashMap<>());
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
@@ -272,7 +310,7 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
getStreamExecutionEnvironment(finishedSavePointPath,
parallelism);
List<String> listenTablesSecondRound =
testParam.getSecondRoundListenTables();
FlinkSourceProvider restoredSourceProvider =
- getFlinkSourceProvider(listenTablesSecondRound, parallelism);
+ getFlinkSourceProvider(listenTablesSecondRound, parallelism,
new HashMap<>());
DataStreamSource<Event> restoreSource =
restoredEnv.fromSource(
restoredSourceProvider.getSource(),
@@ -432,7 +470,8 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
}
}
- private FlinkSourceProvider getFlinkSourceProvider(List<String> tables,
int parallelism) {
+ private FlinkSourceProvider getFlinkSourceProvider(
+ List<String> tables, int parallelism, Map<String, String>
additionalOptions) {
List<String> fullTableNames =
tables.stream()
.map(table -> customDatabase.getDatabaseName() + "." +
table)
@@ -446,6 +485,7 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
options.put(SERVER_ID.key(), getServerId(parallelism));
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+ options.putAll(additionalOptions);
Factory.Context context =
new FactoryHelper.DefaultContext(
org.apache.flink.cdc.common.configuration.Configuration.fromMap(options),