This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 28bd19348 [flink] Sync the table with same name in different db
(#1743)
28bd19348 is described below
commit 28bd193488bfee3a0b033eb38d6b72a939d6c00d
Author: JunZhang <[email protected]>
AuthorDate: Wed Aug 9 10:18:20 2023 +0800
[flink] Sync the table with same name in different db (#1743)
---
docs/content/how-to/cdc-ingestion.md | 6 +--
.../shortcodes/generated/mysql_sync_database.html | 2 +-
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 5 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 13 ++---
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 63 +++++++++++++++++++---
.../mysql/MySqlSyncDatabaseTableListITCase.java | 4 +-
6 files changed, 69 insertions(+), 24 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index 4b13d0d08..f102753bf 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -196,7 +196,7 @@ under database `source_db`. The command to submit the job
looks like:
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4 \
- --including-tables 'product|user|address'
+ --including-tables 'source_db.product|source_db.user|source_db.address'
```
At a later point we would like the job to also synchronize tables [order,
custom],
@@ -222,7 +222,7 @@ The command to recover from previous snapshot and add new
tables to synchronize
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
- --including-tables 'product|user|address|order|custom'
+ --including-tables
'source_db.product|source_db.user|source_db.address|source_db.order|source_db.custom'
```
{{< hint info >}}
@@ -249,7 +249,7 @@ synchronize all the `db.+.tbl.+` into tables
`test_db.tbl1`, `test_db.tbl2` ...
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4 \
- --including-tables 'tbl.+'
+ --including-tables 'db.+.tbl.+'
```
By setting database-name to a regular expression, the synchronization job will
capture all tables under matched databases
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index fe6f3d5e3..6df0c04bc 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -51,7 +51,7 @@ under the License.
</tr>
<tr>
<td><h5>--including-tables</h5></td>
- <td>It is used to specify which source tables are to be synchronized.
You must use '|' to separate multiple tables.Because '|' is a special
character, a comma is required, for example: 'a|b|c'.Regular expression is
supported, for example, specifying "--including-tables test|paimon.*" means to
synchronize table 'test' and all tables start with 'paimon'.</td>
+ <td>It is used to specify which source tables are to be synchronized.
You must use '|' to separate multiple tables.Because '|' is a special
character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular
expression is supported, for example, specifying "--including-tables
db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables
start with 'db2.paimon'.</td>
</tr>
<tr>
<td><h5>--excluding-tables</h5></td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 418db64d2..344bc087c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -155,7 +155,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
try {
root = objectMapper.readValue(rawEvent, JsonNode.class);
payload = root.get("payload");
- currentTable = payload.get("source").get("table").asText();
+ currentTable = getDatabaseName() + "." +
payload.get("source").get("table").asText();
shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -164,7 +164,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public String parseTableName() {
- return tableNameConverter.convert(Identifier.create(getDatabaseName(),
currentTable));
+ String tableName = payload.get("source").get("table").asText();
+ return tableNameConverter.convert(Identifier.create(getDatabaseName(),
tableName));
}
private boolean isSchemaChange() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index a6746904e..5a6819ee1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -295,7 +295,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
schema.identifier());
return false;
}
- return shouldMonitorTable(schema.tableName());
+ return shouldMonitorTable(
+ schema.identifier().getDatabaseName() + "." +
schema.tableName());
};
}
@@ -409,14 +410,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
// a table can be monitored only when its name meets the including
pattern and doesn't
// be excluded by excluding pattern at the same time
- String includingPattern =
- String.format(
- "%s%s(%s)",
- mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
- separatorRex,
- includingTables);
if (excludedTables.isEmpty()) {
- return includingPattern;
+ return includingTables;
}
String excludingPattern =
@@ -430,7 +425,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
+
t.getObjectName()))
.collect(Collectors.joining("|"));
excludingPattern = "?!" + excludingPattern;
- return String.format("(%s)(%s)", excludingPattern,
includingPattern);
+ return String.format("(%s)(%s)", excludingPattern,
includingTables);
}
throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " +
mode);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index eeb76d8be..379f916dc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -563,7 +563,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
public void testIncludingTables() throws Exception {
includingAndExcludingTablesImpl(
"paimon_sync_database_including",
- "flink|paimon.+",
+
"paimon_sync_database_including.flink|paimon_sync_database_including.paimon.+",
null,
Arrays.asList("flink", "paimon_1", "paimon_2"),
Collections.singletonList("ignored"));
@@ -575,18 +575,67 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
includingAndExcludingTablesImpl(
"paimon_sync_database_excluding",
null,
- "flink|paimon.+",
+
"paimon_sync_database_excluding.flink|paimon_sync_database_excluding.paimon.+",
Collections.singletonList("sync"),
Arrays.asList("flink", "paimon_1", "paimon_2"));
}
+ @Test
+ @Timeout(60)
+ public void testSameTableNameInDifferentDatabase() throws Exception {
+ String databaseName =
"paimon_sync_database_excluding|paimon_sync_database_including";
+ String includingTables = "paimon_sync_database_including.paimon_1";
+ String excludingTables = "paimon_sync_database_excluding.paimon_1";
+
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", databaseName);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ true,
+ null,
+ null,
+ includingTables,
+ excludingTables,
+ Collections.emptyMap(),
+ tableConfig,
+ DIVIDED);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate(
+ "INSERT INTO paimon_sync_database_including.paimon_1
VALUES (1),(2)");
+ statement.executeUpdate(
+ "INSERT INTO paimon_sync_database_excluding.paimon_1
VALUES (3),(4)");
+
+ FileStoreTable table = getFileStoreTable("paimon_1");
+ RowType rowType =
+ RowType.of(new DataType[] {DataTypes.INT().notNull()}, new
String[] {"k"});
+ List<String> primaryKeys = Collections.singletonList("k");
+ // only the paimon_sync_database_including.paimon_1 is synchronized
+ waitForResult(Arrays.asList("+I[1]", "+I[2]"), table, rowType,
primaryKeys);
+ }
+ }
+
@Test
@Timeout(60)
public void testIncludingAndExcludingTables() throws Exception {
includingAndExcludingTablesImpl(
"paimon_sync_database_in_excluding",
- "flink|paimon.+",
- "paimon_1",
+
"paimon_sync_database_in_excluding.flink|paimon_sync_database_in_excluding.paimon.+",
+ "paimon_sync_database_in_excluding.paimon_1",
Arrays.asList("flink", "paimon_2"),
Arrays.asList("paimon_1", "test"));
}
@@ -756,8 +805,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
true,
null,
null,
- "t.+",
- ".*a$",
+ mySqlDatabase + ".t.+",
+ mySqlDatabase + "..*a$",
Collections.emptyMap(),
tableConfig,
COMBINED);
@@ -1057,7 +1106,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
true,
null,
null,
- "t.+",
+ databaseName + ".t.+",
null,
catalogConfig,
tableConfig,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index faf9f2977..657b1ab56 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -69,8 +69,8 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
false,
null,
null,
- "t.+|s.+",
- "ta|sa",
+ ".*shard_.*\\.t.+|.*shard_.*\\.s.+",
+ ".*shard_.*\\.ta|.*shard_.*\\.sa",
Collections.emptyMap(),
tableConfig,
mode);