This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 15ab85860 [FLINK-35884][pipeline-connector][mysql] MySQL pipeline
connector supports to set chunk key-column (#3490)
15ab85860 is described below
commit 15ab85860be304d5ae1cc343fd9adce715bcc4dc
Author: Junbo wang <[email protected]>
AuthorDate: Mon Aug 12 15:31:36 2024 +0800
[FLINK-35884][pipeline-connector][mysql] MySQL pipeline connector supports
to set chunk key-column (#3490)
Co-authored-by: wangjunbo <[email protected]>
Co-authored-by: Hang Ruan <[email protected]>
---
.../mysql/factory/MySqlDataSourceFactory.java | 41 ++++++++++++++++++++++
.../mysql/source/MySqlDataSourceOptions.java | 10 ++++++
.../mysql/source/MySqlDataSourceFactoryTest.java | 37 +++++++++++++++++++
.../src/test/resources/ddl/inventory.sql | 20 +++++++++++
4 files changed, 108 insertions(+)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index e8c39ce3d..7f7691961 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -36,12 +36,14 @@ import
org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.ZoneId;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -60,6 +62,7 @@ import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
@@ -184,6 +187,35 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
}
configFactory.tableList(capturedTables.toArray(new String[0]));
+ String chunkKeyColumns =
config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+ if (chunkKeyColumns != null) {
+ Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();
+ List<TableId> tableIds =
+ MySqlSchemaUtils.listTables(configFactory.createConfig(0),
null);
+ for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
+ String[] splits = chunkKeyColumn.split(":");
+ if (splits.length == 2) {
+ Selectors chunkKeySelector =
+ new
Selectors.SelectorsBuilder().includeTables(splits[0]).build();
+ List<ObjectPath> tableList =
+ getChunkKeyColumnTableList(tableIds,
chunkKeySelector);
+ for (ObjectPath table : tableList) {
+ chunkKeyColumnMap.put(table, splits[1]);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key()
+ + " = "
+ + chunkKeyColumns
+ + " failed to be parsed in this part '"
+ + chunkKeyColumn
+ + "'.");
+ }
+ }
+ LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
+ configFactory.chunkKeyColumn(chunkKeyColumnMap);
+ }
+
return new MySqlDataSource(configFactory);
}
@@ -219,6 +251,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
@@ -246,6 +279,14 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
.collect(Collectors.toList());
}
+ private static List<ObjectPath> getChunkKeyColumnTableList(
+ List<TableId> tableIds, Selectors selectors) {
+ return tableIds.stream()
+ .filter(selectors::isMatch)
+ .map(tableId -> new ObjectPath(tableId.getSchemaName(),
tableId.getTableName()))
+ .collect(Collectors.toList());
+ }
+
private static StartupOptions getStartupOptions(Configuration config) {
String modeString = config.get(SCAN_STARTUP_MODE);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index f6f1a671a..9a18350b3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -214,6 +214,16 @@ public class MySqlDataSourceOptions {
+ " and the query MySQL for splitting
would happen when it is uneven."
+ " The distribution factor could be
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+ @Experimental
+ public static final ConfigOption<String>
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+ ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The chunk key of table snapshot, captured tables
are split into multiple chunks by a chunk key when read the snapshot of table."
+ + "By default, the chunk key is the first
column of the primary key."
+ + "eg.
db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2;");
+
@Experimental
public static final ConfigOption<Boolean>
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index b1aab84b0..c7b442480 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectPath;
import org.junit.Test;
@@ -38,6 +39,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
@@ -107,6 +109,7 @@ public class MySqlDataSourceFactoryTest extends
MySqlSourceTestBase {
.isEqualTo(
Arrays.asList(
inventoryDatabase.getDatabaseName() +
".customers",
+ inventoryDatabase.getDatabaseName() +
".multi_max_table",
inventoryDatabase.getDatabaseName() +
".products"));
}
@@ -239,6 +242,40 @@ public class MySqlDataSourceFactoryTest extends
MySqlSourceTestBase {
.isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() +
".products"));
}
+ @Test
+ public void testAddChunkKeyColumns() {
+ inventoryDatabase.createAndInitialize();
+ Map<String, String> options = new HashMap<>();
+ options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+ options.put(PORT.key(),
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+ options.put(USERNAME.key(), TEST_USER);
+ options.put(PASSWORD.key(), TEST_PASSWORD);
+ options.put(TABLES.key(), inventoryDatabase.getDatabaseName() +
".\\.*");
+ options.put(
+ SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key(),
+ inventoryDatabase.getDatabaseName()
+ + ".multi_max_\\.*:order_id;"
+ + inventoryDatabase.getDatabaseName()
+ + ".products:id;");
+ Factory.Context context = new
MockContext(Configuration.fromMap(options));
+
+ MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+ MySqlDataSource dataSource = (MySqlDataSource)
factory.createDataSource(context);
+ ObjectPath multiMaxTable =
+ new ObjectPath(inventoryDatabase.getDatabaseName(),
"multi_max_table");
+ ObjectPath productsTable = new
ObjectPath(inventoryDatabase.getDatabaseName(), "products");
+
+ assertThat(dataSource.getSourceConfig().getChunkKeyColumns())
+ .isNotEmpty()
+ .isEqualTo(
+ new HashMap<ObjectPath, String>() {
+ {
+ put(multiMaxTable, "order_id");
+ put(productsTable, "id");
+ }
+ });
+ }
+
class MockContext implements Factory.Context {
Configuration factoryConfiguration;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
index 86d985e99..626107f5c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
@@ -69,4 +69,24 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-02-19', 1002, 2, 106),
(default, '16-02-21', 1003, 1, 107);
+CREATE TABLE `multi_max_table`
+(
+ `order_id` varchar(128) NOT NULL,
+ `index` int(11) NOT NULL,
+ `desc` varchar(512) NOT NULL,
+ PRIMARY KEY (`order_id`, `index`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+INSERT INTO multi_max_table
+VALUES ('', 0, 'flink'),
+ ('', 1, 'flink'),
+ ('', 2, 'flink'),
+ ('a', 0, 'flink'),
+ ('b', 0, 'flink'),
+ ('c', 0, 'flink'),
+ ('d', 0, 'flink'),
+ ('E', 0, 'flink'),
+ ('E', 1, 'flink'),
+ ('E', 2, 'flink'),
+ ('e', 4, 'flink'),
+ ('E', 3, 'flink');