This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 869131565 [flink] Fix the table list of mysql cdc table
synchronization action (#1712)
869131565 is described below
commit 86913156570b8fb3de267b91077485a6162e01e1
Author: JunZhang <[email protected]>
AuthorDate: Wed Aug 9 10:43:17 2023 +0800
[flink] Fix the table list of mysql cdc table synchronization action (#1712)
---
.../action/cdc/mysql/MySqlSyncTableAction.java | 21 +++++++-----
.../cdc/mysql/MySqlSyncTableActionITCase.java | 40 ++++++++++++++++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 24 +++++++++++++
3 files changed, 76 insertions(+), 9 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 2a421c967..c60f4fa2e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -137,22 +137,25 @@ public class MySqlSyncTableAction extends ActionBase {
String.format(
"mysql-conf [%s] must be specified.",
MySqlSourceOptions.TABLE_NAME.key()));
- String tableList =
- mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
- + "\\."
- + mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
- MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
-
boolean caseSensitive = catalog.caseSensitive();
if (!caseSensitive) {
validateCaseInsensitive();
}
- MySqlSchema mySqlSchema =
+ List<MySqlSchema> mySqlSchemaList =
MySqlActionUtils.getMySqlSchemaList(
- mySqlConfig, monitorTablePredication(), new
ArrayList<>())
- .stream()
+ mySqlConfig, monitorTablePredication(), new
ArrayList<>());
+
+ String tableList =
+ mySqlSchemaList.stream()
+ .map(m -> m.identifier().getDatabaseName() + "." +
m.tableName())
+ .collect(Collectors.joining("|"));
+
+ MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
+
+ MySqlSchema mySqlSchema =
+ mySqlSchemaList.stream()
.reduce(MySqlSchema::merge)
.orElseThrow(
() ->
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9261cf163..9f22898a5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1035,6 +1035,46 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Arrays.asList("pk", "pt"));
}
+ @Test
+ public void testSyncMultipleTable() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "paimon_multiple_table");
+ mySqlConfig.put("table-name", "t1|t2");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.singletonList("id"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ FileStoreTable table = getFileStoreTable();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
+ },
+ new String[] {"id", "name"});
+ waitForResult(
+ Arrays.asList("+I[1, flink]", "+I[2, paimon]"),
+ table,
+ rowType,
+ Collections.singletonList("id"));
+ }
+
private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 608e324ed..8971afc1d 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -316,3 +316,27 @@ CREATE TABLE t1 (
_date VARCHAR(10),
PRIMARY KEY (pk)
);
+
+
+--
################################################################################
+-- testSyncMultipleTable
+--
################################################################################
+
+CREATE DATABASE paimon_multiple_table;
+USE paimon_multiple_table;
+
+CREATE TABLE t1 (
+ id INT,
+ name VARCHAR(10),
+ PRIMARY KEY (id)
+);
+
+INSERT INTO t1 VALUES (1, 'flink');
+
+CREATE TABLE t2 (
+ id INT,
+ name VARCHAR(10),
+ PRIMARY KEY (id)
+);
+
+INSERT INTO t2 VALUES (2, 'paimon');
\ No newline at end of file