This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1d22e1538 [flink][mysql-cdc] Support database synchronization without
merging shards (#1721)
1d22e1538 is described below
commit 1d22e15385f7d46b5a6177a64bbb6145f6dc6f08
Author: yuzelin <[email protected]>
AuthorDate: Thu Aug 3 13:35:19 2023 +0800
[flink][mysql-cdc] Support database synchronization without merging shards
(#1721)
---
docs/content/how-to/cdc-ingestion.md | 6 +
.../shortcodes/generated/mysql_sync_database.html | 4 +
.../flink/action/cdc/TableNameConverter.java | 19 +-
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 2 +-
.../flink/action/cdc/mysql/MySqlActionUtils.java | 2 +-
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 7 +-
.../paimon/flink/action/cdc/mysql/MySqlSchema.java | 7 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 57 +++-
.../cdc/mysql/MySqlSyncDatabaseActionFactory.java | 11 +
.../action/cdc/mysql/MySqlActionITCaseBase.java | 13 +
.../flink/action/cdc/mysql/MySqlContainer.java | 14 +-
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 295 ++++++++++++++-------
.../mysql/MySqlSyncDatabaseTableListITCase.java | 121 ++-------
.../cdc/mysql/MySqlSyncTableActionITCase.java | 72 ++---
.../test/resources/mysql/sync_database_setup.sql | 75 ++++++
.../test/resources/mysql/tablelist_test_setup.sql | 30 ---
16 files changed, 438 insertions(+), 297 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index a071156ff..066683cd5 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -140,6 +140,7 @@ To use this feature through `flink run`, run the following
shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--ignore-incompatible <true/false>] \
+ [--merge-shards <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
@@ -254,6 +255,11 @@ synchronize all the `db.+.tbl.+` into tables
`test_db.tbl1`, `test_db.tbl2` ...
By setting database-name to a regular expression, the synchronization job will
capture all tables under matched databases
and merge tables of the same name into one table.
+{{< hint info >}}
+You can set `--merge-shards false` to prevent merging shards. The synchronized
tables will be named to 'databaseName_tableName'
+to avoid potential name conflict.
+{{< /hint >}}
+
## Kafka
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index 8336fe6c4..fe6f3d5e3 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -37,6 +37,10 @@ under the License.
<td><h5>--ignore-incompatible</h5></td>
<td>It is default false, in this case, if MySQL table name exists in
Paimon and their schema is incompatible,an exception will be thrown. You can
specify it to true explicitly to ignore the incompatible tables and
exception.</td>
</tr>
+ <tr>
+ <td><h5>--merge-shards</h5></td>
+ <td>It is default true, in this case, if some tables in different
databases have the same name, their schemas will be merged and their records
will be synchronized into one Paimon table. Otherwise, each table's records
will be synchronized to a corresponding Paimon table, and the Paimon table will
be named to 'databaseName_tableName' to avoid potential name conflict.</td>
+ </tr>
<tr>
<td><h5>--table-prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized. For example,
if you want all synchronized tables to have "ods_" as prefix, you can specify
"--table-prefix ods_".</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
index 692b10252..67c70aa58 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action.cdc;
+import org.apache.paimon.catalog.Identifier;
+
import java.io.Serializable;
/** Used to convert a MySQL source table name to corresponding Paimon table
name. */
@@ -26,15 +28,18 @@ public class TableNameConverter implements Serializable {
private static final long serialVersionUID = 1L;
private final boolean caseSensitive;
+ private final boolean mergeShards;
private final String prefix;
private final String suffix;
public TableNameConverter(boolean caseSensitive) {
- this(caseSensitive, "", "");
+ this(caseSensitive, true, "", "");
}
- public TableNameConverter(boolean caseSensitive, String prefix, String
suffix) {
+ public TableNameConverter(
+ boolean caseSensitive, boolean mergeShards, String prefix, String
suffix) {
this.caseSensitive = caseSensitive;
+ this.mergeShards = mergeShards;
this.prefix = prefix;
this.suffix = suffix;
}
@@ -43,4 +48,14 @@ public class TableNameConverter implements Serializable {
String tableName = caseSensitive ? originName :
originName.toLowerCase();
return prefix + tableName + suffix;
}
+
+ public String convert(Identifier originIdentifier) {
+ String rawName =
+ mergeShards
+ ? originIdentifier.getObjectName()
+ : originIdentifier.getDatabaseName()
+ + "_"
+ + originIdentifier.getObjectName();
+ return convert(rawName);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 0791b3df1..6ec6b3864 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -134,7 +134,7 @@ public class KafkaSyncDatabaseAction extends ActionBase {
catalog.createDatabase(database, true);
TableNameConverter tableNameConverter =
- new TableNameConverter(caseSensitive, tablePrefix,
tableSuffix);
+ new TableNameConverter(caseSensitive, true, tablePrefix,
tableSuffix);
KafkaSource<String> source =
KafkaActionUtils.buildKafkaSource(kafkaConfig);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index fdd99975d..add673f8b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -200,7 +200,7 @@ public class MySqlActionUtils {
!mySqlFields.containsKey(fieldName.toLowerCase()),
String.format(
"Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
- fieldName, mySqlSchema.identifier()));
+ fieldName, mySqlSchema.tableName()));
mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
}
mySqlPrimaryKeys =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 2053d5fc4..4aac385b2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -23,6 +23,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -163,7 +164,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public String parseTableName() {
- return tableNameConverter.convert(currentTable);
+ return tableNameConverter.convert(Identifier.create(getDatabaseName(),
currentTable));
}
private boolean isSchemaChange() {
@@ -290,6 +291,10 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
return records;
}
+ private String getDatabaseName() {
+ return payload.get("source").get("db").asText();
+ }
+
private Map<String, String> extractRow(JsonNode recordRow) {
JsonNode schema =
Preconditions.checkNotNull(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index c88637537..6626fd762 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -32,15 +32,18 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
/** Utility class to load MySQL table schema with JDBC. */
public class MySqlSchema {
private final String databaseName;
private final String tableName;
-
private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
private final List<String> primaryKeys;
+ private boolean hasMerged = false;
+
public MySqlSchema(
DatabaseMetaData metaData,
String databaseName,
@@ -88,6 +91,7 @@ public class MySqlSchema {
}
public Identifier identifier() {
+ checkState(!hasMerged, "Cannot get table identifier from merged
schema.");
return Identifier.create(databaseName, tableName);
}
@@ -106,6 +110,7 @@ public class MySqlSchema {
}
public MySqlSchema merge(MySqlSchema other) {
+ hasMerged = true;
for (Map.Entry<String, Tuple2<DataType, String>> entry :
other.fields.entrySet()) {
String fieldName = entry.getKey();
DataType newType = entry.getValue().f0;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index cf948e091..a6746904e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
@@ -101,6 +102,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
private final Configuration mySqlConfig;
private final String database;
private final boolean ignoreIncompatible;
+ private final boolean mergeShards;
private final String tablePrefix;
private final String tableSuffix;
private final Pattern includingPattern;
@@ -109,6 +111,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
private final String includingTables;
private final DatabaseSyncMode mode;
+ private List<Identifier> monitoredTables;
+
public MySqlSyncDatabaseAction(
Map<String, String> mySqlConfig,
String warehouse,
@@ -121,6 +125,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
warehouse,
database,
ignoreIncompatible,
+ true,
null,
null,
null,
@@ -135,6 +140,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
String warehouse,
String database,
boolean ignoreIncompatible,
+ boolean mergeShards,
@Nullable String tablePrefix,
@Nullable String tableSuffix,
@Nullable String includingTables,
@@ -146,6 +152,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
this.mySqlConfig = Configuration.fromMap(mySqlConfig);
this.database = database;
this.ignoreIncompatible = ignoreIncompatible;
+ this.mergeShards = mergeShards;
this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
this.includingTables = includingTables == null ? ".*" :
includingTables;
@@ -172,9 +179,9 @@ public class MySqlSyncDatabaseAction extends ActionBase {
List<MySqlSchema> beforeMerging =
MySqlActionUtils.getMySqlSchemaList(
mySqlConfig, monitorTablePredication(),
excludedTables);
- List<Identifier> monitoredTables =
+ monitoredTables =
beforeMerging.stream().map(MySqlSchema::identifier).collect(Collectors.toList());
- List<MySqlSchema> mySqlSchemas = mergeShards(beforeMerging);
+ List<MySqlSchema> mySqlSchemas = mergeShards ?
mergeShards(beforeMerging) : beforeMerging;
checkArgument(
mySqlSchemas.size() > 0,
@@ -184,12 +191,11 @@ public class MySqlSyncDatabaseAction extends ActionBase {
catalog.createDatabase(database, true);
TableNameConverter tableNameConverter =
- new TableNameConverter(caseSensitive, tablePrefix,
tableSuffix);
+ new TableNameConverter(caseSensitive, mergeShards,
tablePrefix, tableSuffix);
List<FileStoreTable> fileStoreTables = new ArrayList<>();
for (MySqlSchema mySqlSchema : mySqlSchemas) {
- String paimonTableName =
tableNameConverter.convert(mySqlSchema.tableName());
- Identifier identifier = new Identifier(database, paimonTableName);
+ Identifier identifier = buildPaimonIdentifier(tableNameConverter,
mySqlSchema);
FileStoreTable table;
Schema fromMySql =
MySqlActionUtils.buildPaimonSchema(
@@ -206,7 +212,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
fileStoreTables.add(table);
} else {
- monitoredTables.remove(mySqlSchema.identifier());
+ unmonitor(mySqlSchema);
}
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromMySql, false);
@@ -221,8 +227,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
+ "MySQL database are not compatible with those of
existed Paimon tables. Please check the log.");
MySqlSource<String> source =
- MySqlActionUtils.buildMySqlSource(
- mySqlConfig, buildTableList(monitoredTables,
excludedTables));
+ MySqlActionUtils.buildMySqlSource(mySqlConfig,
buildTableList(excludedTables));
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
@@ -328,7 +333,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
+ "MySQL table is: %s, fields are: %s.\n",
identifier.getFullName(),
paimonSchema.fields(),
- mySqlSchema.identifier(),
+ mySqlSchema.tableName(),
mySqlSchema.fields());
}
@@ -347,12 +352,42 @@ public class MySqlSyncDatabaseAction extends ActionBase {
return new ArrayList<>(schemaMap.values());
}
+ private Identifier buildPaimonIdentifier(
+ TableNameConverter tableNameConverter, MySqlSchema mySqlSchema) {
+ String tableName;
+ if (mergeShards) {
+ tableName = tableNameConverter.convert(mySqlSchema.tableName());
+ } else {
+ // the Paimon table name should be compound of origin database
name and table name
+ // together to avoid name conflict
+ tableName = tableNameConverter.convert(mySqlSchema.identifier());
+ }
+
+ return Identifier.create(database, tableName);
+ }
+
+ private void unmonitor(MySqlSchema mySqlSchema) {
+ if (mergeShards) {
+ // if schema has been merged, all shards with the same table name
should be removed
+ monitoredTables =
+ monitoredTables.stream()
+ .filter(id ->
!id.getObjectName().equals(mySqlSchema.tableName()))
+ .collect(Collectors.toList());
+ } else {
+ monitoredTables.remove(mySqlSchema.identifier());
+ }
+ }
+
+ @VisibleForTesting
+ public List<Identifier> monitoredTables() {
+ return monitoredTables;
+ }
+
/**
* See {@link
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils#discoverCapturedTables}
* and {@code MySqlSyncDatabaseTableListITCase}.
*/
- private String buildTableList(
- List<Identifier> monitoredTables, List<Identifier> excludedTables)
{
+ private String buildTableList(List<Identifier> excludedTables) {
String separatorRex = "\\.";
if (mode == DIVIDED) {
// In DIVIDED mode, we only concern about existed tables
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index 0a6723bf7..dd9938772 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -49,6 +49,8 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
String warehouse = params.get("warehouse");
String database = params.get("database");
boolean ignoreIncompatible =
Boolean.parseBoolean(params.get("ignore-incompatible"));
+ boolean mergeShards =
+ !params.has("merge-shards") ||
Boolean.parseBoolean(params.get("merge-shards"));
String tablePrefix = params.get("table-prefix");
String tableSuffix = params.get("table-suffix");
String includingTables = params.get("including-tables");
@@ -80,6 +82,7 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
warehouse,
database,
ignoreIncompatible,
+ mergeShards,
tablePrefix,
tableSuffix,
includingTables,
@@ -103,6 +106,7 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
System.out.println(
" mysql-sync-database --warehouse <warehouse-path> --database
<database-name> "
+ "[--ignore-incompatible <true/false>] "
+ + "[--merge-shards <true/false>] "
+ "[--table-prefix <paimon-table-prefix>] "
+ "[--table-suffix <paimon-table-suffix>] "
+ "[--including-tables
<mysql-table-name|name-regular-expr>] "
@@ -119,6 +123,13 @@ public class MySqlSyncDatabaseActionFactory implements
ActionFactory {
+ "You can specify it to true explicitly to ignore the
incompatible tables and exception.");
System.out.println();
+ System.out.println(
+ "--merge-shards is default true, in this case, if some tables
in different databases have the same name, "
+ + "their schemas will be merged and their records will
be synchronized into one Paimon table. "
+ + "Otherwise, each table's records will be
synchronized to a corresponding Paimon table, "
+ + "and the Paimon table will be named to
'databaseName_tableName' to avoid potential name conflict.");
+ System.out.println();
+
System.out.println(
"--table-prefix is the prefix of all Paimon tables to be
synchronized. For example, if you want all "
+ "synchronized tables to have \"ods_\" as prefix, you
can specify `--table-prefix ods_`.");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index b17e4fa12..9d23ebc6e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -34,6 +34,10 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
@@ -77,6 +81,15 @@ public class MySqlActionITCaseBase extends ActionITCaseBase {
LOG.info("Containers are started.");
}
+ protected Statement getStatement() throws SQLException {
+ Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ return conn.createStatement();
+ }
+
protected void waitForResult(
List<String> expected, FileStoreTable table, RowType rowType,
List<String> primaryKeys)
throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
index 023f1d4e7..21ef8d145 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
@@ -92,20 +92,10 @@ public class MySqlContainer extends JdbcDatabaseContainer {
}
}
- public String getJdbcUrl(String databaseName) {
- String additionalUrlParams = constructUrlParameters("?", "&");
- return "jdbc:mysql://"
- + getHost()
- + ":"
- + getDatabasePort()
- + "/"
- + databaseName
- + additionalUrlParams;
- }
-
@Override
public String getJdbcUrl() {
- return getJdbcUrl(databaseName);
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://" + getHost() + ":" + getDatabasePort() +
additionalUrlParams;
}
public int getDatabasePort() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 9ae121d68..eeb76d8be 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -45,8 +45,6 @@ import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -68,12 +66,6 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for {@link MySqlSyncDatabaseAction}. */
public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
- private static final String DATABASE_NAME = "paimon_sync_database";
-
- private static final String DATABASE_NAME_TINYINT_CONVERT =
- "paimon_sync_database_tinyint_schema";
-
- private static final String DATABASE_NAME_TINYINT =
"paimon_sync_database_tinyint";
@TempDir java.nio.file.Path tempDir;
@BeforeAll
@@ -86,7 +78,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Timeout(60)
public void testSchemaEvolution() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("database-name", "paimon_sync_database");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
@@ -106,14 +98,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testSchemaEvolutionImpl(statement);
- }
+ try (Statement statement = getStatement()) {
+ testSchemaEvolutionImpl(statement);
}
}
@@ -234,7 +220,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Timeout(60)
public void testSchemaEvolutionWithTinyInt1Convert() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", DATABASE_NAME_TINYINT_CONVERT);
+ mySqlConfig.put("database-name",
"paimon_sync_database_tinyint_schema");
mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -255,14 +241,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testSchemaEvolutionImplWithTinyInt1Convert(statement);
- }
+ try (Statement statement = getStatement()) {
+ testSchemaEvolutionImplWithTinyInt1Convert(statement);
}
}
@@ -270,7 +250,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
FileStoreTable table1 = getFileStoreTable("schema_evolution_4");
FileStoreTable table2 = getFileStoreTable("schema_evolution_5");
- statement.executeUpdate("USE " + DATABASE_NAME_TINYINT_CONVERT);
+ statement.executeUpdate("USE " +
"paimon_sync_database_tinyint_schema");
statement.executeUpdate("INSERT INTO schema_evolution_4 VALUES (1,
'one')");
statement.executeUpdate("INSERT INTO schema_evolution_5 VALUES (2,
'two', 21)");
@@ -330,7 +310,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Test
public void testSpecifiedMySqlTable() {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("database-name", "paimon_sync_database");
mySqlConfig.put("table-name", "my_table");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -410,12 +390,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
waitJobRunning(client);
// validate `compatible` can be synchronized
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
+ try (Statement statement = getStatement()) {
FileStoreTable table = getFileStoreTable("compatible");
statement.executeUpdate("USE
paimon_sync_database_ignore_incompatible");
@@ -468,6 +443,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
warehouse,
database,
false,
+ true,
"test_prefix_",
"_test_suffix",
null,
@@ -479,12 +455,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
+ try (Statement statement = getStatement()) {
testTableAffixImpl(statement);
}
}
@@ -643,6 +614,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
warehouse,
database,
false,
+ true,
null,
null,
includingTables,
@@ -690,12 +662,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
+
"{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
// check sync schema changes and records
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
+ try (Statement statement = getStatement()) {
statement.executeUpdate("USE paimon_ignore_CASE");
statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi')");
RowType rowType1 =
@@ -767,6 +734,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
@Test
+ @Timeout(60)
public void testAddIgnoredTable() throws Exception {
String mySqlDatabase = "paimon_sync_database_add_ignored_table";
@@ -785,6 +753,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
warehouse,
database,
false,
+ true,
null,
null,
"t.+",
@@ -796,13 +765,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(mySqlDatabase),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
-
+ try (Statement statement = getStatement()) {
FileStoreTable table1 = getFileStoreTable("t1");
statement.executeUpdate("USE " + mySqlDatabase);
@@ -858,20 +821,14 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
buildSyncDatabaseActionWithNewlyAddedTables(databaseName,
testSchemaChange);
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(databaseName),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testNewlyAddedTableImpl(
- client,
- statement,
- numOfNewlyAddedTables,
- testSavepointRecovery,
- testSchemaChange,
- databaseName);
- }
+ try (Statement statement = getStatement()) {
+ testNewlyAddedTableImpl(
+ client,
+ statement,
+ numOfNewlyAddedTables,
+ testSavepointRecovery,
+ testSchemaChange,
+ databaseName);
}
}
@@ -1097,6 +1054,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
warehouse,
database,
false,
+ true,
null,
null,
"t.+",
@@ -1120,7 +1078,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Timeout(60)
public void testTinyInt1Convert() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", DATABASE_NAME_TINYINT);
+ mySqlConfig.put("database-name", "paimon_sync_database_tinyint");
mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1141,14 +1099,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testTinyInt1Convert(statement);
- }
+ try (Statement statement = getStatement()) {
+ testTinyInt1Convert(statement);
}
}
@@ -1203,6 +1155,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
warehouse,
database,
false,
+ true,
null,
null,
null,
@@ -1215,12 +1168,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(databaseName),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate("USE " + databaseName);
// wait checkpointing to step into incremental phase
Thread.sleep(2_000);
@@ -1268,6 +1217,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
warehouse,
database,
false,
+ true,
null,
null,
null,
@@ -1279,12 +1229,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
+ try (Statement statement = getStatement()) {
// test insert into t1
statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES
(1, 'db1_1')");
statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES
(2, 'db1_2')");
@@ -1352,7 +1297,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
rowType,
Collections.singletonList("k"));
- // test newly added table
+ // test newly created table
if (mode == COMBINED) {
statement.executeUpdate(
"CREATE TABLE database_shard_1.t4 (k INT, v1
VARCHAR(10), PRIMARY KEY (k))");
@@ -1381,6 +1326,178 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
}
+ @Test
+ @Timeout(60)
+ public void testSyncMultipleShardsWithoutMerging() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "without_merging_shard_.*");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ?
DIVIDED : COMBINED;
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ false,
+ null,
+ null,
+ null,
+ null,
+ Collections.emptyMap(),
+ tableConfig,
+ mode);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Statement statement = getStatement()) {
+ Thread.sleep(5_000);
+
+ Catalog catalog = catalog();
+ assertThat(catalog.listTables(database))
+ .containsExactlyInAnyOrder(
+ "without_merging_shard_1_t1",
+ "without_merging_shard_1_t2",
+ "without_merging_shard_2_t1");
+
+ // test insert into without_merging_shard_1.t1
+ statement.executeUpdate(
+ "INSERT INTO without_merging_shard_1.t1 VALUES (1,
'db1_1'), (2, 'db1_2')");
+ FileStoreTable table =
getFileStoreTable("without_merging_shard_1_t1");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ waitForResult(
+ Arrays.asList("+I[1, db1_1]", "+I[2, db1_2]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+
+ // test insert into without_merging_shard_2.t1
+ statement.executeUpdate(
+ "INSERT INTO without_merging_shard_2.t1 VALUES (3,
'db2_3', 300), (4, 'db2_4', 400)");
+ table = getFileStoreTable("without_merging_shard_2_t1");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
DataTypes.VARCHAR(20), DataTypes.BIGINT()
+ },
+ new String[] {"k", "v1", "v2"});
+ waitForResult(
+ Arrays.asList("+I[3, db2_3, 300]", "+I[4, db2_4, 400]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+
+ // test schema evolution of without_merging_shard_1.t2
+ statement.executeUpdate("ALTER TABLE without_merging_shard_1.t2
ADD COLUMN v2 DOUBLE");
+ statement.executeUpdate(
+ "INSERT INTO without_merging_shard_1.t2 VALUES (1,
'Apache', 1.1)");
+ statement.executeUpdate(
+ "INSERT INTO without_merging_shard_1.t2 VALUES (2,
'Paimon', 2.2)");
+ table = getFileStoreTable("without_merging_shard_1_t2");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
DataTypes.VARCHAR(10), DataTypes.DOUBLE()
+ },
+ new String[] {"k", "v1", "v2"});
+ waitForResult(
+ Arrays.asList("+I[1, Apache, 1.1]", "+I[2, Paimon, 2.2]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+
+ // test newly created table
+ if (mode == COMBINED) {
+ statement.executeUpdate(
+ "CREATE TABLE without_merging_shard_1.t3 (k INT, v1
VARCHAR(10), PRIMARY KEY (k))");
+ statement.executeUpdate(
+ "INSERT INTO without_merging_shard_1.t3 VALUES (1,
'test')");
+
+ statement.executeUpdate(
+ "CREATE TABLE without_merging_shard_2.t3 (k INT, v1
VARCHAR(10), PRIMARY KEY (k))");
+ statement.executeUpdate(
+ "INSERT INTO without_merging_shard_2.t3 VALUES (2,
'test')");
+
+ while (!catalog.listTables(database)
+ .containsAll(
+ Arrays.asList(
+ "without_merging_shard_1_t3",
+ "without_merging_shard_2_t3"))) {
+ Thread.sleep(100);
+ }
+
+ table = getFileStoreTable("without_merging_shard_1_t3");
+ rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, test]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+
+ table = getFileStoreTable("without_merging_shard_2_t3");
+ waitForResult(
+ Collections.singletonList("+I[2, test]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+ }
+ }
+ }
+
+ @Test
+ public void testUnminitorTablesWithMergingShards() throws Exception {
+ // create an incompatible table named t2
+ Catalog catalog = catalog();
+ catalog.createDatabase(database, true);
+ Identifier identifier = Identifier.create(database, "t2");
+ Schema schema =
+ Schema.newBuilder()
+ .column("k", DataTypes.STRING())
+ .column("v1", DataTypes.STRING())
+ .primaryKey("k")
+ .build();
+ catalog.createTable(identifier, schema, false);
+
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "test_unmonitor_table_shard_.*");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ true,
+ true,
+ null,
+ null,
+ null,
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ COMBINED);
+ action.build(env);
+
+ assertThat(action.monitoredTables())
+ .containsOnly(
+ Identifier.create("test_unmonitor_table_shard_1",
"t1"),
+ Identifier.create("test_unmonitor_table_shard_2",
"t1"));
+ }
+
private void assertTableExists(List<String> tableNames) {
Catalog catalog = catalog();
for (String tableName : tableNames) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index 674c79456..ed370b5c0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -20,9 +20,6 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
@@ -31,12 +28,8 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.Statement;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -53,7 +46,6 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
start();
}
- // TODO it's more convenient to check table without merging shards
@Test
@Timeout(60)
public void testActionRunResult() throws Exception {
@@ -73,6 +65,7 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
warehouse,
database,
false,
+ false,
null,
null,
"t.+|s.+",
@@ -84,69 +77,21 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl("shard_1"),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
+ try (Statement statement = getStatement()) {
+ Thread.sleep(5_000);
Catalog catalog = catalog();
assertThat(catalog.listTables(database))
- .containsExactlyInAnyOrder("t1", "t11", "t2", "t22", "t3",
"taa", "tb", "s2");
-
- RowType rowType =
- RowType.of(
- new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(100)},
- new String[] {"k", "name"});
- List<String> pk = Collections.singletonList("k");
-
- waitForResult(
- Arrays.asList("+I[2, shard_2.t1]", "+I[3, x_shard_1.t1]"),
- getFileStoreTable("t1"),
- rowType,
- pk);
-
- waitForResult(
- Collections.singletonList("+I[1, shard_1.t11]"),
- getFileStoreTable("t11"),
- rowType,
- pk);
-
- waitForResult(
- Collections.singletonList("+I[1, shard_1.t2]"),
- getFileStoreTable("t2"),
- rowType,
- pk);
-
- waitForResult(
- Collections.singletonList("+I[2, shard_2.t22]"),
- getFileStoreTable("t22"),
- rowType,
- pk);
-
- waitForResult(
- Arrays.asList("+I[1, shard_1.t3]", "+I[2, shard_2.t3]"),
- getFileStoreTable("t3"),
- rowType,
- pk);
-
- waitForResult(
- Collections.singletonList("+I[1, shard_1.taa]"),
- getFileStoreTable("taa"),
- rowType,
- pk);
-
- waitForResult(
- Collections.singletonList("+I[2, shard_2.tb]"),
- getFileStoreTable("tb"),
- rowType,
- pk);
-
- waitForResult(
- Collections.singletonList("+I[1, shard_1.s2]"),
- getFileStoreTable("s2"),
- rowType,
- pk);
+ .containsExactlyInAnyOrder(
+ "shard_1_t11",
+ "shard_1_t2",
+ "shard_1_t3",
+ "shard_1_taa",
+ "shard_1_s2",
+ "shard_2_t1",
+ "shard_2_t22",
+ "shard_2_t3",
+ "shard_2_tb",
+ "x_shard_1_t1");
// test newly added tables
if (mode == COMBINED) {
@@ -155,12 +100,9 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
// ignored: ta
statement.executeUpdate(
"CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
- statement.executeUpdate("INSERT INTO ta VALUES (10,
'shard_2.ta')");
-
// captured: s3
statement.executeUpdate(
"CREATE TABLE s3 (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
- statement.executeUpdate("INSERT INTO s3 VALUES (10,
'shard_2.s3')");
// case 2: new tables in new captured database
statement.executeUpdate("CREATE DATABASE shard_3");
@@ -168,16 +110,11 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
// ignored: ta, m
statement.executeUpdate(
"CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
- statement.executeUpdate("INSERT INTO ta VALUES (10,
'shard_3.ta')");
-
statement.executeUpdate(
"CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY
(k))");
- statement.executeUpdate("INSERT INTO m VALUES (10,
'shard_3.m')");
-
// captured: tab
statement.executeUpdate(
"CREATE TABLE tab (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
- statement.executeUpdate("INSERT INTO tab VALUES (10,
'shard_3.tab')");
// case 3: new tables in new ignored database
statement.executeUpdate("CREATE DATABASE what");
@@ -185,30 +122,28 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
// ignored: ta
statement.executeUpdate(
"CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
- statement.executeUpdate("INSERT INTO ta VALUES (10,
'what.ta')");
-
// match including pattern but ignored: s4
statement.executeUpdate(
"CREATE TABLE s4 (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
- statement.executeUpdate("INSERT INTO s4 VALUES (10,
'what.s4')");
Thread.sleep(5_000);
assertThat(catalog.listTables(database))
.containsExactlyInAnyOrder(
- "t1", "t11", "t2", "t22", "t3", "taa", "tb",
"s2", "s3", "tab");
-
- waitForResult(
- Collections.singletonList("+I[10, shard_2.s3]"),
- getFileStoreTable("s3"),
- rowType,
- pk);
-
- waitForResult(
- Collections.singletonList("+I[10, shard_3.tab]"),
- getFileStoreTable("tab"),
- rowType,
- pk);
+ // old
+ "shard_1_t11",
+ "shard_1_t2",
+ "shard_1_t3",
+ "shard_1_taa",
+ "shard_1_s2",
+ "shard_2_t1",
+ "shard_2_t22",
+ "shard_2_t3",
+ "shard_2_tb",
+ "x_shard_1_t1",
+ // new
+ "shard_2_s3",
+ "shard_3_tab");
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index ef386e3ee..9261cf163 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -35,8 +35,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
@@ -91,14 +89,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
checkTableSchema(
"[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT
NOT
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testSchemaEvolutionImpl(statement);
- }
+ try (Statement statement = getStatement()) {
+ testSchemaEvolutionImpl(statement);
}
}
@@ -111,7 +103,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
private void testSchemaEvolutionImpl(Statement statement) throws Exception
{
FileStoreTable table = getFileStoreTable();
- statement.executeUpdate("USE paimon_sync_table");
+ statement.executeUpdate("USE " + DATABASE_NAME);
statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1,
'one')");
statement.executeUpdate(
@@ -282,20 +274,14 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
checkTableSchema(
"[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testSchemaEvolutionMultipleImpl(statement);
- }
+ try (Statement statement = getStatement()) {
+ testSchemaEvolutionMultipleImpl(statement);
}
}
private void testSchemaEvolutionMultipleImpl(Statement statement) throws
Exception {
FileStoreTable table = getFileStoreTable();
- statement.executeUpdate("USE paimon_sync_table");
+ statement.executeUpdate("USE " + DATABASE_NAME);
statement.executeUpdate(
"INSERT INTO schema_evolution_multiple VALUES (1, 'one', 10,
'string_1')");
@@ -377,14 +363,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testAllTypesImpl(statement);
- }
+ try (Statement statement = getStatement()) {
+ testAllTypesImpl(statement);
}
client.cancel().get();
@@ -632,6 +612,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
// test all types during schema evolution
try {
+ statement.executeUpdate("USE " + DATABASE_NAME);
statement.executeUpdate("ALTER TABLE all_types_table ADD COLUMN v
INT");
List<DataField> newFields = new ArrayList<>(rowType.getFields());
newFields.add(new DataField(rowType.getFieldCount(), "v",
DataTypes.INT()));
@@ -811,13 +792,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
waitJobRunning(client);
if (executeMysql) {
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
- statement.execute("USE paimon_sync_table");
+ try (Statement statement = getStatement()) {
+ statement.execute("USE " + DATABASE_NAME);
statement.executeUpdate(
"INSERT INTO test_computed_column VALUES (1,
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
statement.executeUpdate(
@@ -905,13 +881,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
- statement.execute("USE paimon_sync_table");
+ try (Statement statement = getStatement()) {
+ statement.execute("USE " + DATABASE_NAME);
statement.executeUpdate(
"INSERT INTO test_tinyint1_convert VALUES (1, '2021-09-15
15:00:10', 21)");
statement.executeUpdate(
@@ -965,14 +936,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
checkTableSchema(
"[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT
NOT
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword())) {
- try (Statement statement = conn.createStatement()) {
- testSchemaEvolutionImplWithTinyIntConvert(statement);
- }
+ try (Statement statement = getStatement()) {
+ testSchemaEvolutionImplWithTinyIntConvert(statement);
}
}
@@ -1043,12 +1008,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
+ try (Statement statement = getStatement()) {
statement.execute("USE shard_1");
statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30'),
(2, '2023-07-30')");
statement.execute("USE shard_2");
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index 46ac60d9f..59c720c06 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -347,6 +347,10 @@ CREATE TABLE a (
PRIMARY KEY (k)
);
+--
################################################################################
+-- testSyncMultipleShards
+--
################################################################################
+
CREATE DATABASE database_shard_1;
USE database_shard_1;
@@ -391,3 +395,74 @@ CREATE TABLE t3 (
k INT,
v1 VARCHAR(10)
);
+
+--
################################################################################
+-- testSyncMultipleShardsWithoutMerging
+--
################################################################################
+
+CREATE DATABASE without_merging_shard_1;
+USE without_merging_shard_1;
+
+CREATE TABLE t1 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE DATABASE without_merging_shard_2;
+USE without_merging_shard_2;
+
+CREATE TABLE t1 (
+ k INT,
+ v1 VARCHAR(20),
+ v2 BIGINT,
+ PRIMARY KEY (k)
+);
+
+-- test some shard doesn't have primary key
+CREATE TABLE t2 (
+ k INT,
+ v1 VARCHAR(10)
+);
+
+--
################################################################################
+-- testUnmonitorTablesWithMergingShards
+--
################################################################################
+
+CREATE DATABASE test_unmonitor_table_shard_1;
+USE test_unmonitor_table_shard_1;
+
+CREATE TABLE t1 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+
+CREATE DATABASE test_unmonitor_table_shard_2;
+USE test_unmonitor_table_shard_2;
+
+CREATE TABLE t1 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+ k INT,
+ v2 DOUBLE,
+ PRIMARY KEY (k)
+);
+
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
index 1237134f2..ea49883dd 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
@@ -37,58 +37,28 @@ CREATE DATABASE ignored;
USE shard_1;
CREATE TABLE t1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
-INSERT INTO t1 VALUES (1, 'shard_1.t1');
-
CREATE TABLE t11 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO t11 VALUES (1, 'shard_1.t11');
-
CREATE TABLE t2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO t2 VALUES (1, 'shard_1.t2');
-
CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO t3 VALUES (1, 'shard_1.t3');
-
CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
-INSERT INTO ta VALUES (1, 'shard_1.ta');
-
CREATE TABLE taa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO taa VALUES (1, 'shard_1.taa');
-
CREATE TABLE s1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
-INSERT INTO s1 VALUES (1, 'shard_1.s1');
-
CREATE TABLE s2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO s2 VALUES (1, 'shard_1.s2');
-
CREATE TABLE sa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
-INSERT INTO sa VALUES (1, 'shard_1.sa');
-
CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
-INSERT INTO m VALUES (1, 'shard_1.m');
USE shard_2;
CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO t1 VALUES (2, 'shard_2.t1');
-
CREATE TABLE t2 (k INT, name VARCHAR(100)); -- ignored because of pk absence
-INSERT INTO t2 VALUES (2, 'shard_2.t2');
-
CREATE TABLE t22 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO t22 VALUES (2, 'shard_2.t22');
-
CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO t3 VALUES (2, 'shard_2.t3');
-
CREATE TABLE tb (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO tb VALUES (2, 'shard_2.tb');
USE x_shard_1;
CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
-INSERT INTO t1 VALUES (3, 'x_shard_1.t1');
USE ignored;
CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
-INSERT INTO t1 VALUES (4, 'ignored.t1');