This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 565032e3a [FLINK-36115][pipeline-connector][mysql]  Introduce 
scan.newly-added-table.enabled option for MySQL Source
565032e3a is described below

commit 565032e3af66677cffd8a797de9af7c9f4490f57
Author: Kunni <[email protected]>
AuthorDate: Thu Aug 22 12:39:16 2024 +0800

    [FLINK-36115][pipeline-connector][mysql]  Introduce 
scan.newly-added-table.enabled option for MySQL Source
    
    This closes  #3560.
---
 .../docs/connectors/pipeline-connectors/mysql.md   | 11 ++++
 .../docs/connectors/pipeline-connectors/mysql.md   | 11 ++++
 .../mysql/factory/MySqlDataSourceFactory.java      | 71 +++++++++++++++++-----
 .../mysql/source/MySqlDataSourceOptions.java       | 11 ++++
 .../source/MysqlPipelineNewlyAddedTableITCase.java | 46 +++++++++++++-
 5 files changed, 131 insertions(+), 19 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index 636ed975f..4d5edb66f 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -275,6 +275,17 @@ pipeline:
       <td>Boolean</td>
       <td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
     </tr>
+    <tr>
+      <td>scan.binlog.newly-added-table.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。 <br>
+          scan.newly-added-table.enabled 和 
scan.binlog.newly-added-table.enabled 参数的不同在于: <br>
+          scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取; <br>
+          scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 879701614..36c4cc770 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -282,6 +282,17 @@ pipeline:
       <td>Boolean</td>
       <td>Whether to enable scan the newly added tables feature or not, by 
default is false. This option is only useful when we start the job from a 
savepoint/checkpoint.</td>
     </tr>
+    <tr>
+      <td>scan.binlog.newly-added-table.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>In binlog reading stage, whether to scan the ddl and dml statements 
of newly added tables or not, by default is false. <br>
+          The difference between scan.newly-added-table.enabled and 
scan.binlog.newly-added-table.enabled options is: <br>
+          scan.newly-added-table.enabled: do re-snapshot & binlog-reading for 
newly added table when restored; <br>
+          scan.binlog.newly-added-table.enabled: only do binlog-reading for 
newly added table during binlog reading phase.
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 7f7691961..5f538eeb1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
 import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
@@ -38,6 +39,7 @@ import 
org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectPath;
 
+import io.debezium.relational.Tables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +63,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
@@ -128,6 +131,8 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
         int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
         boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+        boolean scanBinlogNewlyAddedTableEnabled =
+                config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
 
         validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 
1);
         validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -166,26 +171,32 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                         .jdbcProperties(getJdbcProperties(configMap))
                         
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
 
-        Selectors selectors = new 
Selectors.SelectorsBuilder().includeTables(tables).build();
-        List<String> capturedTables = 
getTableList(configFactory.createConfig(0), selectors);
-        if (capturedTables.isEmpty()) {
-            throw new IllegalArgumentException(
-                    "Cannot find any table by the option 'tables' = " + 
tables);
-        }
-        if (tablesExclude != null) {
-            Selectors selectExclude =
-                    new 
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
-            List<String> excludeTables = 
getTableList(configFactory.createConfig(0), selectExclude);
-            if (!excludeTables.isEmpty()) {
-                capturedTables.removeAll(excludeTables);
-            }
+        if (scanBinlogNewlyAddedTableEnabled) {
+            String newTables = validateTableAndReturnDebeziumStyle(tables);
+            configFactory.tableList(newTables);
+        } else {
+            Selectors selectors = new 
Selectors.SelectorsBuilder().includeTables(tables).build();
+            List<String> capturedTables = 
getTableList(configFactory.createConfig(0), selectors);
             if (capturedTables.isEmpty()) {
                 throw new IllegalArgumentException(
-                        "Cannot find any table with by the option 
'tables.exclude'  = "
-                                + tablesExclude);
+                        "Cannot find any table by the option 'tables' = " + 
tables);
             }
+            if (tablesExclude != null) {
+                Selectors selectExclude =
+                        new 
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
+                List<String> excludeTables =
+                        getTableList(configFactory.createConfig(0), 
selectExclude);
+                if (!excludeTables.isEmpty()) {
+                    capturedTables.removeAll(excludeTables);
+                }
+                if (capturedTables.isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Cannot find any table with by the option 
'tables.exclude'  = "
+                                    + tablesExclude);
+                }
+            }
+            configFactory.tableList(capturedTables.toArray(new String[0]));
         }
-        configFactory.tableList(capturedTables.toArray(new String[0]));
 
         String chunkKeyColumns = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         if (chunkKeyColumns != null) {
@@ -256,6 +267,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         options.add(CHUNK_META_GROUP_SIZE);
         options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
         options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
         return options;
     }
 
@@ -410,6 +422,33 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                         distributionFactorLower));
     }
 
+    /**
+     * Currently, The supported regular syntax is not exactly the same in 
{@link Selectors} and
+     * {@link Tables.TableFilter}.
+     *
+     * <p>The main distinction are :
+     *
+     * <p>1) {@link Selectors} use `,` to split table names and {@link 
Tables.TableFilter} use use
+     * `|` to split table names.
+     *
+     * <p>2) If there is a need to use a dot (.) in a regular expression to 
match any character, it
+     * is necessary to escape the dot with a backslash, refer to {@link
+     * MySqlDataSourceOptions#TABLES}.
+     */
+    private String validateTableAndReturnDebeziumStyle(String tables) {
+        // MySQL table names are not allowed to have `,` character.
+        if (tables.contains(",")) {
+            throw new IllegalArgumentException(
+                    "the `,` in "
+                            + tables
+                            + " is not supported when "
+                            + SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED
+                            + " was enabled.");
+        }
+
+        return tables.replace("\\.", ".");
+    }
+
     /** Replaces the default timezone placeholder with session timezone, if 
applicable. */
     private static ZoneId getServerTimeZone(Configuration config) {
         final String serverTimeZone = config.get(SERVER_TIME_ZONE);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 9a18350b3..580d370b5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -261,4 +261,15 @@ public class MySqlDataSourceOptions {
                                     + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
                                     + "it is necessary to escape the dot with 
a backslash."
                                     + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED =
+            ConfigOptions.key("scan.binlog.newly-added-table.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "In binlog reading stage, whether to scan the ddl 
and dml statements of newly added tables or not, by default is false. \n"
+                                    + "The difference between 
scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled 
options is: \n"
+                                    + "scan.newly-added-table.enabled: do 
re-snapshot & binlog-reading for newly added table when restored; \n"
+                                    + "scan.binlog.newly-added-table.enabled: 
only do binlog-reading for newly added table during binlog reading phase.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
index 7187e6447..4cc1e952a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.ChangeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
@@ -83,7 +84,10 @@ import static java.lang.String.format;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
@@ -149,6 +153,40 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
         return DebeziumUtils.createMySqlConnection(configuration, new 
Properties());
     }
 
+    @Test
+    public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
+        List<String> tables = Collections.singletonList("address_\\.*");
+        Map<String, String> options = new HashMap<>();
+        options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        options.put(SCAN_STARTUP_MODE.key(), "timestamp");
+        options.put(
+                SCAN_STARTUP_TIMESTAMP_MILLIS.key(), 
String.valueOf(System.currentTimeMillis()));
+
+        FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, 
options);
+        StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+        env.enableCheckpointing(200);
+        DataStreamSource<Event> source =
+                env.fromSource(
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        MySqlDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(env.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(env, source, resultBuffer, serializer, 
accumulatorName);
+        env.executeAsync("AddNewlyTablesWhenReadingBinlog");
+        initialAddressTables(getConnection(), 
Collections.singletonList("address_beijing"));
+        List<Event> actual = fetchResults(iterator, 4);
+        assertThat(((ChangeEvent) actual.get(0)).tableId())
+                .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), 
"address_beijing"));
+    }
+
     @Test
     public void testAddNewTableOneByOneSingleParallelism() throws Exception {
         TestParam testParam =
@@ -228,7 +266,7 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
         List<String> listenTablesFirstRound = 
testParam.getFirstRoundListenTables();
 
         FlinkSourceProvider sourceProvider =
-                getFlinkSourceProvider(listenTablesFirstRound, parallelism);
+                getFlinkSourceProvider(listenTablesFirstRound, parallelism, 
new HashMap<>());
         DataStreamSource<Event> source =
                 env.fromSource(
                         sourceProvider.getSource(),
@@ -272,7 +310,7 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
                 getStreamExecutionEnvironment(finishedSavePointPath, 
parallelism);
         List<String> listenTablesSecondRound = 
testParam.getSecondRoundListenTables();
         FlinkSourceProvider restoredSourceProvider =
-                getFlinkSourceProvider(listenTablesSecondRound, parallelism);
+                getFlinkSourceProvider(listenTablesSecondRound, parallelism, 
new HashMap<>());
         DataStreamSource<Event> restoreSource =
                 restoredEnv.fromSource(
                         restoredSourceProvider.getSource(),
@@ -432,7 +470,8 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
         }
     }
 
-    private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, 
int parallelism) {
+    private FlinkSourceProvider getFlinkSourceProvider(
+            List<String> tables, int parallelism, Map<String, String> 
additionalOptions) {
         List<String> fullTableNames =
                 tables.stream()
                         .map(table -> customDatabase.getDatabaseName() + "." + 
table)
@@ -446,6 +485,7 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
         options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
         options.put(SERVER_ID.key(), getServerId(parallelism));
         options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        options.putAll(additionalOptions);
         Factory.Context context =
                 new FactoryHelper.DefaultContext(
                         
org.apache.flink.cdc.common.configuration.Configuration.fromMap(options),

Reply via email to