This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 15ab85860 [FLINK-35884][pipeline-connector][mysql] MySQL pipeline 
connector supports to set chunk key-column (#3490)
15ab85860 is described below

commit 15ab85860be304d5ae1cc343fd9adce715bcc4dc
Author: Junbo wang <[email protected]>
AuthorDate: Mon Aug 12 15:31:36 2024 +0800

    [FLINK-35884][pipeline-connector][mysql] MySQL pipeline connector supports 
to set chunk key-column (#3490)
    
    
    Co-authored-by: wangjunbo <[email protected]>
    Co-authored-by: Hang Ruan <[email protected]>
---
 .../mysql/factory/MySqlDataSourceFactory.java      | 41 ++++++++++++++++++++++
 .../mysql/source/MySqlDataSourceOptions.java       | 10 ++++++
 .../mysql/source/MySqlDataSourceFactoryTest.java   | 37 +++++++++++++++++++
 .../src/test/resources/ddl/inventory.sql           | 20 +++++++++++
 4 files changed, 108 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index e8c39ce3d..7f7691961 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -36,12 +36,14 @@ import 
org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
 import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectPath;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.time.ZoneId;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -60,6 +62,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
@@ -184,6 +187,35 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         }
         configFactory.tableList(capturedTables.toArray(new String[0]));
 
+        String chunkKeyColumns = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+        if (chunkKeyColumns != null) {
+            Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();
+            List<TableId> tableIds =
+                    MySqlSchemaUtils.listTables(configFactory.createConfig(0), 
null);
+            for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
+                String[] splits = chunkKeyColumn.split(":");
+                if (splits.length == 2) {
+                    Selectors chunkKeySelector =
+                            new 
Selectors.SelectorsBuilder().includeTables(splits[0]).build();
+                    List<ObjectPath> tableList =
+                            getChunkKeyColumnTableList(tableIds, 
chunkKeySelector);
+                    for (ObjectPath table : tableList) {
+                        chunkKeyColumnMap.put(table, splits[1]);
+                    }
+                } else {
+                    throw new IllegalArgumentException(
+                            SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key()
+                                    + " = "
+                                    + chunkKeyColumns
+                                    + " failed to be parsed in this part '"
+                                    + chunkKeyColumn
+                                    + "'.");
+                }
+            }
+            LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
+            configFactory.chunkKeyColumn(chunkKeyColumnMap);
+        }
+
         return new MySqlDataSource(configFactory);
     }
 
@@ -219,6 +251,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         options.add(CONNECTION_POOL_SIZE);
         options.add(HEARTBEAT_INTERVAL);
         options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         options.add(CHUNK_META_GROUP_SIZE);
         options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
@@ -246,6 +279,14 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                 .collect(Collectors.toList());
     }
 
+    private static List<ObjectPath> getChunkKeyColumnTableList(
+            List<TableId> tableIds, Selectors selectors) {
+        return tableIds.stream()
+                .filter(selectors::isMatch)
+                .map(tableId -> new ObjectPath(tableId.getSchemaName(), 
tableId.getTableName()))
+                .collect(Collectors.toList());
+    }
+
     private static StartupOptions getStartupOptions(Configuration config) {
         String modeString = config.get(SCAN_STARTUP_MODE);
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index f6f1a671a..9a18350b3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -214,6 +214,16 @@ public class MySqlDataSourceOptions {
                                     + " and the query MySQL for splitting 
would happen when it is uneven."
                                     + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
 
+    @Experimental
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key."
+                                    + "eg. 
db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2;");
+
     @Experimental
     public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
             ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index b1aab84b0..c7b442480 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.factories.Factory;
 import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
 import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectPath;
 
 import org.junit.Test;
 
@@ -38,6 +39,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
@@ -107,6 +109,7 @@ public class MySqlDataSourceFactoryTest extends 
MySqlSourceTestBase {
                 .isEqualTo(
                         Arrays.asList(
                                 inventoryDatabase.getDatabaseName() + 
".customers",
+                                inventoryDatabase.getDatabaseName() + 
".multi_max_table",
                                 inventoryDatabase.getDatabaseName() + 
".products"));
     }
 
@@ -239,6 +242,40 @@ public class MySqlDataSourceFactoryTest extends 
MySqlSourceTestBase {
                 .isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + 
".products"));
     }
 
+    @Test
+    public void testAddChunkKeyColumns() {
+        inventoryDatabase.createAndInitialize();
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".\\.*");
+        options.put(
+                SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key(),
+                inventoryDatabase.getDatabaseName()
+                        + ".multi_max_\\.*:order_id;"
+                        + inventoryDatabase.getDatabaseName()
+                        + ".products:id;");
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
+        ObjectPath multiMaxTable =
+                new ObjectPath(inventoryDatabase.getDatabaseName(), 
"multi_max_table");
+        ObjectPath productsTable = new 
ObjectPath(inventoryDatabase.getDatabaseName(), "products");
+
+        assertThat(dataSource.getSourceConfig().getChunkKeyColumns())
+                .isNotEmpty()
+                .isEqualTo(
+                        new HashMap<ObjectPath, String>() {
+                            {
+                                put(multiMaxTable, "order_id");
+                                put(productsTable, "id");
+                            }
+                        });
+    }
+
     class MockContext implements Factory.Context {
 
         Configuration factoryConfiguration;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
index 86d985e99..626107f5c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
@@ -69,4 +69,24 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
        (default, '2016-02-19', 1002, 2, 106),
        (default, '16-02-21', 1003, 1, 107);
 
+CREATE TABLE `multi_max_table`
+(
+    `order_id`   varchar(128) NOT NULL,
+    `index`   int(11) NOT NULL,
+    `desc`  varchar(512) NOT NULL,
+    PRIMARY KEY (`order_id`, `index`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+INSERT INTO multi_max_table
+VALUES ('', 0, 'flink'),
+       ('', 1, 'flink'),
+       ('', 2, 'flink'),
+       ('a', 0, 'flink'),
+       ('b', 0, 'flink'),
+       ('c', 0, 'flink'),
+       ('d', 0, 'flink'),
+       ('E', 0, 'flink'),
+       ('E', 1, 'flink'),
+       ('E', 2, 'flink'),
+       ('e', 4, 'flink'),
+       ('E', 3, 'flink');
 

Reply via email to