yuzelin commented on code in PR #1258:
URL: https://github.com/apache/incubator-paimon/pull/1258#discussion_r1210140021
##########
paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql:
##########
@@ -390,6 +390,96 @@ CREATE TABLE ignored (
PRIMARY KEY (k)
);
+--
################################################################################
+-- MySqlSyncDatabaseActionITCase#testSyncMultipleDatabaseIncludingTables
+--
################################################################################
+
+CREATE DATABASE paimon_sync_database_including_01;
+USE paimon_sync_database_including_01;
+
+CREATE TABLE paimon_1 (
+ k INT,
+ PRIMARY KEY (k)
Review Comment:
Indentation is too long.
Keep indentation the same length as above.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java:
##########
@@ -206,6 +206,10 @@ public void build(StreamExecutionEnvironment env) throws
Exception {
"No tables to be synchronized. Possible cause is the schemas
of all tables in specified "
+ "MySQL database are not compatible with those of
existed Paimon tables. Please check the log.");
+ String databaseNameStr =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+ mySqlConfig.set(
+ MySqlSourceOptions.DATABASE_NAME,
+ "(" + String.join("|", databaseNameStr.split(",")) + ")");
Review Comment:
I think we should not use `,` to split names, just use regular expression.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java:
##########
@@ -248,21 +252,25 @@ private void validateCaseInsensitive() {
}
private List<MySqlSchema> getMySqlSchemaList() throws Exception {
Review Comment:
After we get schema, we should merge the tables those have the same name
into one paimon table. See
`MySqlSyncTableAction`.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java:
##########
@@ -248,21 +252,25 @@ private void validateCaseInsensitive() {
}
private List<MySqlSchema> getMySqlSchemaList() throws Exception {
- String databaseName =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+ String databaseListStr =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+ String[] databaseList = databaseListStr.split(",");
Review Comment:
See `MySqlSyncTableAction#getMySqlSchemaList`.
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java:
##########
@@ -479,6 +479,70 @@ private void testTableAffixImpl(Statement statement)
throws Exception {
waitForResult(expected, table2, rowType2, primaryKeys2);
}
Review Comment:
We need to test if the schema can be merged correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]