This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 28bd19348 [flink] Sync the table with same name in different  db 
(#1743)
28bd19348 is described below

commit 28bd193488bfee3a0b033eb38d6b72a939d6c00d
Author: JunZhang <[email protected]>
AuthorDate: Wed Aug 9 10:18:20 2023 +0800

    [flink] Sync the table with same name in different  db (#1743)
---
 docs/content/how-to/cdc-ingestion.md               |  6 +--
 .../shortcodes/generated/mysql_sync_database.html  |  2 +-
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |  5 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 13 ++---
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 63 +++++++++++++++++++---
 .../mysql/MySqlSyncDatabaseTableListITCase.java    |  4 +-
 6 files changed, 69 insertions(+), 24 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 4b13d0d08..f102753bf 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -196,7 +196,7 @@ under database `source_db`. The command to submit the job 
looks like:
     --table-conf bucket=4 \
     --table-conf changelog-producer=input \
     --table-conf sink.parallelism=4 \
-    --including-tables 'product|user|address'
+    --including-tables 'source_db.product|source_db.user|source_db.address'
 ```
 
 At a later point we would like the job to also synchronize tables [order, 
custom], 
@@ -222,7 +222,7 @@ The command to recover from previous snapshot and add new 
tables to synchronize
     --catalog-conf metastore=hive \
     --catalog-conf uri=thrift://hive-metastore:9083 \
     --table-conf bucket=4 \
-    --including-tables 'product|user|address|order|custom'
+    --including-tables 
'source_db.product|source_db.user|source_db.address|source_db.order|source_db.custom'
 ```
 
 {{< hint info >}}
@@ -249,7 +249,7 @@ synchronize all the `db.+.tbl.+` into tables 
`test_db.tbl1`, `test_db.tbl2` ...
     --table-conf bucket=4 \
     --table-conf changelog-producer=input \
     --table-conf sink.parallelism=4 \
-    --including-tables 'tbl.+'
+    --including-tables 'db.+.tbl.+'
 ```
 
 By setting database-name to a regular expression, the synchronization job will 
capture all tables under matched databases 
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html 
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index fe6f3d5e3..6df0c04bc 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -51,7 +51,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--including-tables</h5></td>
-        <td>It is used to specify which source tables are to be synchronized. 
You must use '|' to separate multiple tables.Because '|' is a special 
character, a comma is required, for example: 'a|b|c'.Regular expression is 
supported, for example, specifying "--including-tables test|paimon.*" means to 
synchronize table 'test' and all tables start with 'paimon'.</td>
+        <td>It is used to specify which source tables are to be synchronized. 
You must use '|' to separate multiple tables.Because '|' is a special 
character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular 
expression is supported, for example, specifying "--including-tables 
db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables 
start with 'db2.paimon'.</td>
     </tr>
     <tr>
         <td><h5>--excluding-tables</h5></td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 418db64d2..344bc087c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -155,7 +155,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         try {
             root = objectMapper.readValue(rawEvent, JsonNode.class);
             payload = root.get("payload");
-            currentTable = payload.get("source").get("table").asText();
+            currentTable = getDatabaseName() + "." + 
payload.get("source").get("table").asText();
             shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -164,7 +164,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
     @Override
     public String parseTableName() {
-        return tableNameConverter.convert(Identifier.create(getDatabaseName(), 
currentTable));
+        String tableName = payload.get("source").get("table").asText();
+        return tableNameConverter.convert(Identifier.create(getDatabaseName(), 
tableName));
     }
 
     private boolean isSchemaChange() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index a6746904e..5a6819ee1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -295,7 +295,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         schema.identifier());
                 return false;
             }
-            return shouldMonitorTable(schema.tableName());
+            return shouldMonitorTable(
+                    schema.identifier().getDatabaseName() + "." + 
schema.tableName());
         };
     }
 
@@ -409,14 +410,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
             // a table can be monitored only when its name meets the including 
pattern and doesn't
             // be excluded by excluding pattern at the same time
-            String includingPattern =
-                    String.format(
-                            "%s%s(%s)",
-                            mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
-                            separatorRex,
-                            includingTables);
             if (excludedTables.isEmpty()) {
-                return includingPattern;
+                return includingTables;
             }
 
             String excludingPattern =
@@ -430,7 +425,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                                                             + 
t.getObjectName()))
                             .collect(Collectors.joining("|"));
             excludingPattern = "?!" + excludingPattern;
-            return String.format("(%s)(%s)", excludingPattern, 
includingPattern);
+            return String.format("(%s)(%s)", excludingPattern, 
includingTables);
         }
 
         throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " + 
mode);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index eeb76d8be..379f916dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -563,7 +563,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     public void testIncludingTables() throws Exception {
         includingAndExcludingTablesImpl(
                 "paimon_sync_database_including",
-                "flink|paimon.+",
+                
"paimon_sync_database_including.flink|paimon_sync_database_including.paimon.+",
                 null,
                 Arrays.asList("flink", "paimon_1", "paimon_2"),
                 Collections.singletonList("ignored"));
@@ -575,18 +575,67 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         includingAndExcludingTablesImpl(
                 "paimon_sync_database_excluding",
                 null,
-                "flink|paimon.+",
+                
"paimon_sync_database_excluding.flink|paimon_sync_database_excluding.paimon.+",
                 Collections.singletonList("sync"),
                 Arrays.asList("flink", "paimon_1", "paimon_2"));
     }
 
+    @Test
+    @Timeout(60)
+    public void testSameTableNameInDifferentDatabase() throws Exception {
+        String databaseName = 
"paimon_sync_database_excluding|paimon_sync_database_including";
+        String includingTables = "paimon_sync_database_including.paimon_1";
+        String excludingTables = "paimon_sync_database_excluding.paimon_1";
+
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", databaseName);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        true,
+                        null,
+                        null,
+                        includingTables,
+                        excludingTables,
+                        Collections.emptyMap(),
+                        tableConfig,
+                        DIVIDED);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Statement statement = getStatement()) {
+            statement.executeUpdate(
+                    "INSERT INTO paimon_sync_database_including.paimon_1 
VALUES (1),(2)");
+            statement.executeUpdate(
+                    "INSERT INTO paimon_sync_database_excluding.paimon_1 
VALUES (3),(4)");
+
+            FileStoreTable table = getFileStoreTable("paimon_1");
+            RowType rowType =
+                    RowType.of(new DataType[] {DataTypes.INT().notNull()}, new 
String[] {"k"});
+            List<String> primaryKeys = Collections.singletonList("k");
+            // only the paimon_sync_database_including.paimon_1 is synchronized
+            waitForResult(Arrays.asList("+I[1]", "+I[2]"), table, rowType, 
primaryKeys);
+        }
+    }
+
     @Test
     @Timeout(60)
     public void testIncludingAndExcludingTables() throws Exception {
         includingAndExcludingTablesImpl(
                 "paimon_sync_database_in_excluding",
-                "flink|paimon.+",
-                "paimon_1",
+                
"paimon_sync_database_in_excluding.flink|paimon_sync_database_in_excluding.paimon.+",
+                "paimon_sync_database_in_excluding.paimon_1",
                 Arrays.asList("flink", "paimon_2"),
                 Arrays.asList("paimon_1", "test"));
     }
@@ -756,8 +805,8 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         true,
                         null,
                         null,
-                        "t.+",
-                        ".*a$",
+                        mySqlDatabase + ".t.+",
+                        mySqlDatabase + "..*a$",
                         Collections.emptyMap(),
                         tableConfig,
                         COMBINED);
@@ -1057,7 +1106,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         true,
                         null,
                         null,
-                        "t.+",
+                        databaseName + ".t.+",
                         null,
                         catalogConfig,
                         tableConfig,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index faf9f2977..657b1ab56 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -69,8 +69,8 @@ public class MySqlSyncDatabaseTableListITCase extends 
MySqlActionITCaseBase {
                         false,
                         null,
                         null,
-                        "t.+|s.+",
-                        "ta|sa",
+                        ".*shard_.*\\.t.+|.*shard_.*\\.s.+",
+                        ".*shard_.*\\.ta|.*shard_.*\\.sa",
                         Collections.emptyMap(),
                         tableConfig,
                         mode);

Reply via email to