This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 869131565 [flink] Fix the table list of mysql cdc table 
synchronization action (#1712)
869131565 is described below

commit 86913156570b8fb3de267b91077485a6162e01e1
Author: JunZhang <[email protected]>
AuthorDate: Wed Aug 9 10:43:17 2023 +0800

    [flink] Fix the table list of mysql cdc table synchronization action (#1712)
---
 .../action/cdc/mysql/MySqlSyncTableAction.java     | 21 +++++++-----
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 40 ++++++++++++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  | 24 +++++++++++++
 3 files changed, 76 insertions(+), 9 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 2a421c967..c60f4fa2e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -137,22 +137,25 @@ public class MySqlSyncTableAction extends ActionBase {
                 String.format(
                         "mysql-conf [%s] must be specified.", 
MySqlSourceOptions.TABLE_NAME.key()));
 
-        String tableList =
-                mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
-                        + "\\."
-                        + mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
-        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
-
         boolean caseSensitive = catalog.caseSensitive();
 
         if (!caseSensitive) {
             validateCaseInsensitive();
         }
 
-        MySqlSchema mySqlSchema =
+        List<MySqlSchema> mySqlSchemaList =
                 MySqlActionUtils.getMySqlSchemaList(
-                                mySqlConfig, monitorTablePredication(), new 
ArrayList<>())
-                        .stream()
+                        mySqlConfig, monitorTablePredication(), new 
ArrayList<>());
+
+        String tableList =
+                mySqlSchemaList.stream()
+                        .map(m -> m.identifier().getDatabaseName() + "." + 
m.tableName())
+                        .collect(Collectors.joining("|"));
+
+        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
+
+        MySqlSchema mySqlSchema =
+                mySqlSchemaList.stream()
                         .reduce(MySqlSchema::merge)
                         .orElseThrow(
                                 () ->
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9261cf163..9f22898a5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1035,6 +1035,46 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                 Arrays.asList("pk", "pt"));
     }
 
+    @Test
+    public void testSyncMultipleTable() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "paimon_multiple_table");
+        mySqlConfig.put("table-name", "t1|t2");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.singletonList("id"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        FileStoreTable table = getFileStoreTable();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
+                        },
+                        new String[] {"id", "name"});
+        waitForResult(
+                Arrays.asList("+I[1, flink]", "+I[2, paimon]"),
+                table,
+                rowType,
+                Collections.singletonList("id"));
+    }
+
     private FileStoreTable getFileStoreTable() throws Exception {
         return getFileStoreTable(tableName);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 608e324ed..8971afc1d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -316,3 +316,27 @@ CREATE TABLE t1 (
     _date VARCHAR(10),
     PRIMARY KEY (pk)
 );
+
+
+-- 
################################################################################
+--  testSyncMultipleTable
+-- 
################################################################################
+
+CREATE DATABASE paimon_multiple_table;
+USE paimon_multiple_table;
+
+CREATE TABLE t1 (
+    id INT,
+    name VARCHAR(10),
+    PRIMARY KEY (id)
+);
+
+INSERT INTO t1 VALUES (1, 'flink');
+
+CREATE TABLE t2 (
+    id INT,
+    name VARCHAR(10),
+    PRIMARY KEY (id)
+);
+
+INSERT INTO t2 VALUES (2, 'paimon');
\ No newline at end of file

Reply via email to