This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4bf77f183 [flink][mysql-cdc] Support Synchronizing multiple shards
into one database (#1674)
4bf77f183 is described below
commit 4bf77f18377d4783868723d897eb4ffce44bb1d1
Author: yuzelin <[email protected]>
AuthorDate: Mon Jul 31 16:57:26 2023 +0800
[flink][mysql-cdc] Support Synchronizing multiple shards into one database
(#1674)
---
docs/content/how-to/cdc-ingestion.md | 31 +++
.../flink/action/cdc/mysql/MySqlActionUtils.java | 61 ++++--
.../paimon/flink/action/cdc/mysql/MySqlSchema.java | 20 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 158 +++++++++------
.../action/cdc/mysql/MySqlSyncTableAction.java | 58 ++----
.../action/cdc/mysql/MySqlActionITCaseBase.java | 6 +
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 146 +++++++++++++-
.../mysql/MySqlSyncDatabaseTableListITCase.java | 215 +++++++++++++++++++++
.../cdc/mysql/MySqlSyncTableActionITCase.java | 6 +-
.../test/resources/mysql/sync_database_setup.sql | 45 +++++
.../test/resources/mysql/tablelist_test_setup.sql | 94 +++++++++
11 files changed, 703 insertions(+), 137 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index 57d3e06a4..b16e0e7b6 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -196,6 +196,37 @@ The command to recover from previous snapshot and add new
tables to synchronize
--including-tables 'product|user|address|order|custom'
```
+{{< hint info >}}
+You can set `--mode combined` to enable synchronizing newly added tables
without restarting job.
+{{< /hint >}}
+
+Example 3: synchronize and merge multiple shards
+
+Let's say you have multiple database shards `db1`, `db2`, ... and each
database has tables `tbl1`, `tbl2`, .... You can
+synchronize all the `db.+.tbl.+` into tables `test_db.tbl1`, `test_db.tbl2`
... by following command:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ mysql-sync-database \
+ --warehouse hdfs:///path/to/warehouse \
+ --database test_db \
+ --mysql-conf hostname=127.0.0.1 \
+ --mysql-conf username=root \
+ --mysql-conf password=123456 \
+ --mysql-conf database-name='db.+' \
+ --catalog-conf metastore=hive \
+ --catalog-conf uri=thrift://hive-metastore:9083 \
+ --table-conf bucket=4 \
+ --table-conf changelog-producer=input \
+ --table-conf sink.parallelism=4 \
+ --including-tables 'tbl.+'
+```
+
+By setting database-name to a regular expression, the synchronization job will
capture all tables under matched databases
+and merge tables of the same name into one table.
+
+
## Kafka
### Prepare Kafka Bundled Jar
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 756b94279..b2a89f03d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
@@ -44,7 +45,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -52,6 +55,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -98,6 +104,44 @@ public class MySqlActionUtils {
mySqlConfig.get(MySqlSourceOptions.PASSWORD));
}
+ static List<MySqlSchema> getMySqlSchemaList(
+ Configuration mySqlConfig,
+ Predicate<MySqlSchema> monitorTablePredication,
+ List<Identifier> excludedTables)
+ throws Exception {
+ Pattern databasePattern =
+
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
+ List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
+ try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet schemas = metaData.getCatalogs()) {
+ while (schemas.next()) {
+ String databaseName = schemas.getString("TABLE_CAT");
+ Matcher databaseMatcher =
databasePattern.matcher(databaseName);
+ if (databaseMatcher.matches()) {
+ try (ResultSet tables =
metaData.getTables(databaseName, null, "%", null)) {
+ while (tables.next()) {
+ String tableName =
tables.getString("TABLE_NAME");
+ MySqlSchema mySqlSchema =
+ new MySqlSchema(
+ metaData,
+ databaseName,
+ tableName,
+
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
+ if (monitorTablePredication.test(mySqlSchema))
{
+ mySqlSchemaList.add(mySqlSchema);
+ } else {
+
excludedTables.add(mySqlSchema.identifier());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return mySqlSchemaList;
+ }
+
static void assertSchemaCompatible(TableSchema paimonSchema, Schema
mySqlSchema) {
if (!schemaCompatible(paimonSchema, mySqlSchema)) {
throw new IllegalArgumentException(
@@ -154,8 +198,8 @@ public class MySqlActionUtils {
checkArgument(
!mySqlFields.containsKey(fieldName.toLowerCase()),
String.format(
- "Duplicate key '%s' in table '%s.%s' appears
when converting fields map keys to case-insensitive form.",
- fieldName, mySqlSchema.databaseName(),
mySqlSchema.tableName()));
+ "Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
+ fieldName, mySqlSchema.identifier()));
mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
}
mySqlPrimaryKeys =
@@ -199,19 +243,17 @@ public class MySqlActionUtils {
return builder.build();
}
- static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig) {
+ static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig,
String tableList) {
validateMySqlConfig(mySqlConfig);
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
- String databaseName =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
- String tableName = mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
sourceBuilder
.hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
.port(mySqlConfig.get(MySqlSourceOptions.PORT))
.username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
.password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
- .databaseList(databaseName)
- .tableList(databaseName + "." + tableName);
+
.databaseList(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME))
+ .tableList(tableList);
mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
mySqlConfig
@@ -317,11 +359,6 @@ public class MySqlActionUtils {
String.format(
"mysql-conf [%s] must be specified.",
MySqlSourceOptions.DATABASE_NAME.key()));
-
- checkArgument(
- mySqlConfig.get(MySqlSourceOptions.TABLE_NAME) != null,
- String.format(
- "mysql-conf [%s] must be specified.",
MySqlSourceOptions.TABLE_NAME.key()));
}
static List<ComputedColumn> buildComputedColumns(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index a60733e10..c88637537 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.types.DataType;
@@ -40,17 +41,6 @@ public class MySqlSchema {
private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
private final List<String> primaryKeys;
- public MySqlSchema(
- String databaseName,
- String tableName,
- LinkedHashMap<String, Tuple2<DataType, String>> fields,
- List<String> primaryKeys) {
- this.databaseName = databaseName;
- this.tableName = tableName;
- this.fields = fields;
- this.primaryKeys = primaryKeys;
- }
-
public MySqlSchema(
DatabaseMetaData metaData,
String databaseName,
@@ -93,14 +83,14 @@ public class MySqlSchema {
}
}
- public String databaseName() {
- return databaseName;
- }
-
public String tableName() {
return tableName;
}
+ public Identifier identifier() {
+ return Identifier.create(databaseName, tableName);
+ }
+
public LinkedHashMap<String, Tuple2<DataType, String>> fields() {
return fields;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index c931fca31..048d9deb2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -42,15 +42,13 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -170,13 +168,18 @@ public class MySqlSyncDatabaseAction extends ActionBase {
validateCaseInsensitive();
}
- List<String> excludedTables = new LinkedList<>();
- List<MySqlSchema> mySqlSchemas = getMySqlSchemaList(excludedTables);
- String mySqlDatabase =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+ List<Identifier> excludedTables = new ArrayList<>();
+ List<MySqlSchema> beforeMerging =
+ MySqlActionUtils.getMySqlSchemaList(
+ mySqlConfig, monitorTablePredication(),
excludedTables);
+ List<Identifier> monitoredTables =
+
beforeMerging.stream().map(MySqlSchema::identifier).collect(Collectors.toList());
+ List<MySqlSchema> mySqlSchemas = mergeShards(beforeMerging);
+
checkArgument(
mySqlSchemas.size() > 0,
"No tables found in MySQL database "
- + mySqlDatabase
+ + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
+ ", or MySQL database does not exist.");
catalog.createDatabase(database, true);
@@ -184,7 +187,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
new TableNameConverter(caseSensitive, tablePrefix,
tableSuffix);
List<FileStoreTable> fileStoreTables = new ArrayList<>();
- List<String> monitoredTables = new ArrayList<>();
for (MySqlSchema mySqlSchema : mySqlSchemas) {
String paimonTableName =
tableNameConverter.convert(mySqlSchema.tableName());
Identifier identifier = new Identifier(database, paimonTableName);
@@ -202,13 +204,13 @@ public class MySqlSyncDatabaseAction extends ActionBase {
Supplier<String> errMsg =
incompatibleMessage(table.schema(), mySqlSchema,
identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
- monitoredTables.add(mySqlSchema.tableName());
fileStoreTables.add(table);
+ } else {
+ monitoredTables.remove(mySqlSchema.identifier());
}
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromMySql, false);
table = (FileStoreTable) catalog.getTable(identifier);
- monitoredTables.add(mySqlSchema.tableName());
fileStoreTables.add(table);
}
}
@@ -217,22 +219,10 @@ public class MySqlSyncDatabaseAction extends ActionBase {
!monitoredTables.isEmpty(),
"No tables to be synchronized. Possible cause is the schemas
of all tables in specified "
+ "MySQL database are not compatible with those of
existed Paimon tables. Please check the log.");
- String tableList;
- if (mode == COMBINED) {
- // First excluding all tables that failed the excludingPattern and
don't have primary
- // keys. Then including other tables using regex so that newly
added table DDLs and DMLs
- // during job runtime can be captured
- tableList =
- excludedTables.stream()
- .map(t -> String.format("(?!(%s))", t))
- .collect(Collectors.joining(""))
- + includingTables;
- } else {
- tableList = "(" + String.join("|", monitoredTables) + ")";
- }
- mySqlConfig.set(MySqlSourceOptions.TABLE_NAME, tableList);
- MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig);
+ MySqlSource<String> source =
+ MySqlActionUtils.buildMySqlSource(
+ mySqlConfig, buildTableList(monitoredTables,
excludedTables));
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
@@ -291,35 +281,17 @@ public class MySqlSyncDatabaseAction extends ActionBase {
tableSuffix));
}
- private List<MySqlSchema> getMySqlSchemaList(List<String> excludedTables)
throws Exception {
- String databaseName =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
- List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
- try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
- DatabaseMetaData metaData = conn.getMetaData();
- try (ResultSet tables =
- metaData.getTables(databaseName, null, "%", new String[]
{"TABLE"})) {
- while (tables.next()) {
- String tableName = tables.getString("TABLE_NAME");
- if (!shouldMonitorTable(tableName)) {
- excludedTables.add(tableName);
- continue;
- }
- MySqlSchema mySqlSchema =
- new MySqlSchema(
- metaData,
- databaseName,
- tableName,
-
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
- if (mySqlSchema.primaryKeys().size() > 0) {
- // only tables with primary keys will be considered
- mySqlSchemaList.add(mySqlSchema);
- } else {
- excludedTables.add(tableName);
- }
- }
+ private Predicate<MySqlSchema> monitorTablePredication() {
+ return schema -> {
+ if (schema.primaryKeys().isEmpty()) {
+ LOG.debug(
+ "Didn't find primary keys from table '{}'. "
+ + "This table won't be synchronized.",
+ schema.identifier());
+ return false;
}
- }
- return mySqlSchemaList;
+ return shouldMonitorTable(schema.tableName());
+ };
}
private boolean shouldMonitorTable(String mySqlTableName) {
@@ -327,7 +299,9 @@ public class MySqlSyncDatabaseAction extends ActionBase {
if (excludingPattern != null) {
shouldMonitor = shouldMonitor &&
!excludingPattern.matcher(mySqlTableName).matches();
}
- LOG.debug("Source table {} is monitored? {}", mySqlTableName,
shouldMonitor);
+ if (!shouldMonitor) {
+ LOG.debug("Source table '{}' is excluded.", mySqlTableName);
+ }
return shouldMonitor;
}
@@ -351,14 +325,82 @@ public class MySqlSyncDatabaseAction extends ActionBase {
String.format(
"Incompatible schema found.\n"
+ "Paimon table is: %s, fields are: %s.\n"
- + "MySQL table is: %s.%s, fields are: %s.\n",
+ + "MySQL table is: %s, fields are: %s.\n",
identifier.getFullName(),
paimonSchema.fields(),
- mySqlSchema.databaseName(),
- mySqlSchema.tableName(),
+ mySqlSchema.identifier(),
mySqlSchema.fields());
}
+ /** Merge schemas for tables that have the same table name. */
+ private List<MySqlSchema> mergeShards(List<MySqlSchema> rawMySqlSchemas) {
+ Map<String, MySqlSchema> schemaMap = new HashMap<>();
+ for (MySqlSchema rawSchema : rawMySqlSchemas) {
+ String tableName = rawSchema.tableName();
+ MySqlSchema schema = schemaMap.get(tableName);
+ if (schema == null) {
+ schemaMap.put(tableName, rawSchema);
+ } else {
+ schemaMap.put(tableName, schema.merge(rawSchema));
+ }
+ }
+ return new ArrayList<>(schemaMap.values());
+ }
+
+ /**
+ * See {@link
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils#discoverCapturedTables}
+ * and {@code MySqlSyncDatabaseTableListITCase}.
+ */
+ private String buildTableList(
+ List<Identifier> monitoredTables, List<Identifier> excludedTables)
{
+ String separatorRex = "\\.";
+ if (mode == DIVIDED) {
+ // In DIVIDED mode, we only concern about existed tables
+ return monitoredTables.stream()
+ .map(t -> t.getDatabaseName() + separatorRex +
t.getObjectName())
+ .collect(Collectors.joining("|"));
+ } else if (mode == COMBINED) {
+ // In COMBINED mode, we should consider both existed tables and
possible newly added
+ // tables, so we should use regular expression to monitor all
valid tables and exclude
+ // certain invalid tables
+
+ // The table list is build by template:
+ //
(?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
+
+ // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables
whose qualified name
+ // is exactly equal to 'db.tbl'
+ // The including pattern
databasePattern\\.(including_pattern1|...) can include tables
+ // whose qualified name matches one of the patterns
+
+ // a table can be monitored only when its name meets the including
pattern and doesn't
+ // be excluded by excluding pattern at the same time
+ String includingPattern =
+ String.format(
+ "%s%s(%s)",
+ mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
+ separatorRex,
+ includingTables);
+ if (excludedTables.isEmpty()) {
+ return includingPattern;
+ }
+
+ String excludingPattern =
+ excludedTables.stream()
+ .map(
+ t ->
+ String.format(
+ "(^%s$)",
+ t.getDatabaseName()
+ + separatorRex
+ +
t.getObjectName()))
+ .collect(Collectors.joining("|"));
+ excludingPattern = "?!" + excludingPattern;
+ return String.format("(%s)(%s)", excludingPattern,
includingPattern);
+ }
+
+ throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " +
mode);
+ }
+
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index e85d1121a..433105863 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -35,15 +35,12 @@ import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.regex.Matcher;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -133,7 +130,16 @@ public class MySqlSyncTableAction extends ActionBase {
}
public void build(StreamExecutionEnvironment env) throws Exception {
- MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig);
+ checkArgument(
+ mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
+ String.format(
+ "mysql-conf [%s] must be specified.",
MySqlSourceOptions.TABLE_NAME.key()));
+
+ String tableList =
+ mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
+ + "\\."
+ + mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
+ MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
boolean caseSensitive = catalog.caseSensitive();
@@ -142,7 +148,9 @@ public class MySqlSyncTableAction extends ActionBase {
}
MySqlSchema mySqlSchema =
- getMySqlSchemaList().stream()
+ MySqlActionUtils.getMySqlSchemaList(
+ mySqlConfig, monitorTablePredication(), new
ArrayList<>())
+ .stream()
.reduce(MySqlSchema::merge)
.orElseThrow(
() ->
@@ -235,38 +243,12 @@ public class MySqlSyncTableAction extends ActionBase {
}
}
- private List<MySqlSchema> getMySqlSchemaList() throws Exception {
- Pattern databasePattern =
-
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
- Pattern tablePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
- List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
- try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
- DatabaseMetaData metaData = conn.getMetaData();
- try (ResultSet schemas = metaData.getCatalogs()) {
- while (schemas.next()) {
- String databaseName = schemas.getString("TABLE_CAT");
- Matcher databaseMatcher =
databasePattern.matcher(databaseName);
- if (databaseMatcher.matches()) {
- try (ResultSet tables =
metaData.getTables(databaseName, null, "%", null)) {
- while (tables.next()) {
- String tableName =
tables.getString("TABLE_NAME");
- Matcher tableMatcher =
tablePattern.matcher(tableName);
- if (tableMatcher.matches()) {
- mySqlSchemaList.add(
- new MySqlSchema(
- metaData,
- databaseName,
- tableName,
- mySqlConfig.get(
-
MYSQL_CONVERTER_TINYINT1_BOOL)));
- }
- }
- }
- }
- }
- }
- }
- return mySqlSchemaList;
+ private Predicate<MySqlSchema> monitorTablePredication() {
+ return schema -> {
+ Pattern tableNamePattern =
+
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
+ return tableNamePattern.matcher(schema.tableName()).matches();
+ };
}
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index ad536a6f6..b17e4fa12 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
@@ -149,4 +150,9 @@ public class MySqlActionITCaseBase extends ActionITCaseBase
{
Thread.sleep(1000);
}
}
+
+ protected FileStoreTable getFileStoreTable(String tableName) throws
Exception {
+ Identifier identifier = Identifier.create(database, tableName);
+ return (FileStoreTable) catalog().getTable(identifier);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index b1bcd88bd..6a0878ace 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -282,7 +283,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
new String[] {"_id", "v1"});
- List<String> primaryKeys1 = Arrays.asList("_id");
+ List<String> primaryKeys1 = Collections.singletonList("_id");
List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
waitForResult(expected, table1, rowType1, primaryKeys1);
@@ -293,7 +294,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
DataTypes.TINYINT()
},
new String[] {"_id", "v1", "v2"});
- List<String> primaryKeys2 = Arrays.asList("_id");
+ List<String> primaryKeys2 = Collections.singletonList("_id");
expected = Arrays.asList("+I[2, two, 21]", "+I[4, four, 24]");
waitForResult(expected, table2, rowType2, primaryKeys2);
@@ -936,7 +937,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
createNewTable(statement, newTableName);
statement.executeUpdate(
String.format("INSERT INTO `%s`.`t2` VALUES (8, 'eight', 80,
800)", databaseName));
- List<Tuple2<Integer, String>> newTableRecords =
getNewTableRecords(newTableCount);
+ List<Tuple2<Integer, String>> newTableRecords = getNewTableRecords();
recordsMap.put(newTableName, newTableRecords);
List<String> newTableExpected = getNewTableExpected(newTableRecords);
insertRecordsIntoNewTable(statement, databaseName, newTableName,
newTableRecords);
@@ -975,7 +976,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Thread.sleep(5000L);
// insert records
- newTableRecords = getNewTableRecords(newTableCount);
+ newTableRecords = getNewTableRecords();
recordsMap.put(newTableName, newTableRecords);
insertRecordsIntoNewTable(statement, databaseName, newTableName,
newTableRecords);
newTable = getFileStoreTable(newTableName);
@@ -1039,7 +1040,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
.collect(Collectors.toList());
}
- private List<Tuple2<Integer, String>> getNewTableRecords(int
newTableCount) {
+ private List<Tuple2<Integer, String>> getNewTableRecords() {
List<Tuple2<Integer, String>> records = new LinkedList<>();
int count = ThreadLocalRandom.current().nextInt(10) + 1;
for (int i = 0; i < count; i++) {
@@ -1176,7 +1177,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
new String[] {"pk", "_datetime", "_tinyint1"});
List<String> expected =
Arrays.asList("+I[1, 2021-09-15T15:00:10, 21]", "+I[2,
2023-03-23T16:00:20, 42]");
- waitForResult(expected, table, rowType, Arrays.asList("pk"));
+ waitForResult(expected, table, rowType,
Collections.singletonList("pk"));
}
@Test
@@ -1257,9 +1258,136 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
}
- private FileStoreTable getFileStoreTable(String tableName) throws
Exception {
- Identifier identifier = Identifier.create(database, tableName);
- return (FileStoreTable) catalog().getTable(identifier);
+ @Test
+ @Timeout(60)
+ public void testSyncMultipleShards() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "database_shard_.*");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ?
DIVIDED : COMBINED;
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ null,
+ null,
+ null,
+ null,
+ Collections.emptyMap(),
+ tableConfig,
+ mode);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement statement = conn.createStatement()) {
+ // test insert into t1
+ statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES
(1, 'db1_1')");
+ statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES
(2, 'db1_2')");
+
+ statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES
(3, 'db2_3', 300)");
+ statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES
(4, 'db2_4', 400)");
+
+ FileStoreTable table = getFileStoreTable("t1");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
DataTypes.VARCHAR(20), DataTypes.BIGINT()
+ },
+ new String[] {"k", "v1", "v2"});
+ waitForResult(
+ Arrays.asList(
+ "+I[1, db1_1, NULL]",
+ "+I[2, db1_2, NULL]",
+ "+I[3, db2_3, 300]",
+ "+I[4, db2_4, 400]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+
+ // test schema evolution of t2
+ statement.executeUpdate("ALTER TABLE database_shard_1.t2 ADD
COLUMN v2 INT");
+ statement.executeUpdate("ALTER TABLE database_shard_2.t2 ADD
COLUMN v3 VARCHAR(10)");
+ statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES
(1, 1.1, 1)");
+ statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES
(2, 2.2, 2)");
+ statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES
(3, 3.3, 'db2_3')");
+ statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES
(4, 4.4, 'db2_4')");
+ table = getFileStoreTable("t2");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.BIGINT().notNull(),
+ DataTypes.DOUBLE(),
+ DataTypes.INT(),
+ DataTypes.VARCHAR(10)
+ },
+ new String[] {"k", "v1", "v2", "v3"});
+ waitForResult(
+ Arrays.asList(
+ "+I[1, 1.1, 1, NULL]",
+ "+I[2, 2.2, 2, NULL]",
+ "+I[3, 3.3, NULL, db2_3]",
+ "+I[4, 4.4, NULL, db2_4]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+
+ // test that database_shard_2.t3 won't be synchronized
+ statement.executeUpdate(
+ "INSERT INTO database_shard_2.t3 VALUES (1, 'db2_1'), (2,
'db2_2')");
+ statement.executeUpdate(
+ "INSERT INTO database_shard_1.t3 VALUES (3, 'db1_3'), (4,
'db1_4')");
+ table = getFileStoreTable("t3");
+ rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ waitForResult(
+ Arrays.asList("+I[3, db1_3]", "+I[4, db1_4]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+
+ // test newly added table
+ if (mode == COMBINED) {
+ statement.executeUpdate(
+ "CREATE TABLE database_shard_1.t4 (k INT, v1
VARCHAR(10), PRIMARY KEY (k))");
+ statement.executeUpdate("INSERT INTO database_shard_1.t4
VALUES (1, 'db1_1')");
+
+ statement.executeUpdate(
+ "CREATE TABLE database_shard_2.t4 (k INT, v1
VARCHAR(10), PRIMARY KEY (k))");
+ statement.executeUpdate("INSERT INTO database_shard_2.t4
VALUES (2, 'db2_2')");
+
+ Catalog catalog = catalog();
+ while (!catalog.tableExists(Identifier.create(database,
"t4"))) {
+ Thread.sleep(100);
+ }
+
+ table = getFileStoreTable("t4");
+ rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ waitForResult(
+ Arrays.asList("+I[1, db1_1]", "+I[2, db2_2]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+ }
+ }
}
private void assertTableExists(List<String> tableNames) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
new file mode 100644
index 000000000..674c79456
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test if the table list in {@link MySqlSyncDatabaseAction} is correct. */
+public class MySqlSyncDatabaseTableListITCase extends MySqlActionITCaseBase {
+
+ @BeforeAll
+ public static void startContainers() {
+ MYSQL_CONTAINER.withSetupSQL("mysql/tablelist_test_setup.sql");
+ start();
+ }
+
+ // TODO it's more convenient to check table without merging shards
+ @Test
+ @Timeout(60)
+ public void testActionRunResult() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", ".*shard_.*");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ DatabaseSyncMode mode = ThreadLocalRandom.current().nextBoolean() ?
DIVIDED : COMBINED;
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ null,
+ null,
+ "t.+|s.+",
+ "ta|sa",
+ Collections.emptyMap(),
+ tableConfig,
+ mode);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl("shard_1"),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement statement = conn.createStatement()) {
+ Catalog catalog = catalog();
+ assertThat(catalog.listTables(database))
+ .containsExactlyInAnyOrder("t1", "t11", "t2", "t22", "t3",
"taa", "tb", "s2");
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(100)},
+ new String[] {"k", "name"});
+ List<String> pk = Collections.singletonList("k");
+
+ waitForResult(
+ Arrays.asList("+I[2, shard_2.t1]", "+I[3, x_shard_1.t1]"),
+ getFileStoreTable("t1"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Collections.singletonList("+I[1, shard_1.t11]"),
+ getFileStoreTable("t11"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Collections.singletonList("+I[1, shard_1.t2]"),
+ getFileStoreTable("t2"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Collections.singletonList("+I[2, shard_2.t22]"),
+ getFileStoreTable("t22"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Arrays.asList("+I[1, shard_1.t3]", "+I[2, shard_2.t3]"),
+ getFileStoreTable("t3"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Collections.singletonList("+I[1, shard_1.taa]"),
+ getFileStoreTable("taa"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Collections.singletonList("+I[2, shard_2.tb]"),
+ getFileStoreTable("tb"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Collections.singletonList("+I[1, shard_1.s2]"),
+ getFileStoreTable("s2"),
+ rowType,
+ pk);
+
+ // test newly added tables
+ if (mode == COMBINED) {
+ // case 1: new tables in existed database
+ statement.executeUpdate("USE shard_2");
+ // ignored: ta
+ statement.executeUpdate(
+ "CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
+ statement.executeUpdate("INSERT INTO ta VALUES (10,
'shard_2.ta')");
+
+ // captured: s3
+ statement.executeUpdate(
+ "CREATE TABLE s3 (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
+ statement.executeUpdate("INSERT INTO s3 VALUES (10,
'shard_2.s3')");
+
+ // case 2: new tables in new captured database
+ statement.executeUpdate("CREATE DATABASE shard_3");
+ statement.executeUpdate("USE shard_3");
+ // ignored: ta, m
+ statement.executeUpdate(
+ "CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
+ statement.executeUpdate("INSERT INTO ta VALUES (10,
'shard_3.ta')");
+
+ statement.executeUpdate(
+ "CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY
(k))");
+ statement.executeUpdate("INSERT INTO m VALUES (10,
'shard_3.m')");
+
+ // captured: tab
+ statement.executeUpdate(
+ "CREATE TABLE tab (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
+ statement.executeUpdate("INSERT INTO tab VALUES (10,
'shard_3.tab')");
+
+ // case 3: new tables in new ignored database
+ statement.executeUpdate("CREATE DATABASE what");
+ statement.executeUpdate("USE what");
+ // ignored: ta
+ statement.executeUpdate(
+ "CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
+ statement.executeUpdate("INSERT INTO ta VALUES (10,
'what.ta')");
+
+ // match including pattern but ignored: s4
+ statement.executeUpdate(
+ "CREATE TABLE s4 (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
+ statement.executeUpdate("INSERT INTO s4 VALUES (10,
'what.s4')");
+
+ Thread.sleep(5_000);
+
+ assertThat(catalog.listTables(database))
+ .containsExactlyInAnyOrder(
+ "t1", "t11", "t2", "t22", "t3", "taa", "tb",
"s2", "s3", "tab");
+
+ waitForResult(
+ Collections.singletonList("+I[10, shard_2.s3]"),
+ getFileStoreTable("s3"),
+ rowType,
+ pk);
+
+ waitForResult(
+ Collections.singletonList("+I[10, shard_3.tab]"),
+ getFileStoreTable("tab"),
+ rowType,
+ pk);
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index f593ad275..2ca19d522 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -18,8 +18,6 @@
package org.apache.paimon.flink.action.cdc.mysql;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -1036,8 +1034,6 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
}
private FileStoreTable getFileStoreTable() throws Exception {
- Catalog catalog = catalog();
- Identifier identifier = Identifier.create(database, tableName);
- return (FileStoreTable) catalog.getTable(identifier);
+ return getFileStoreTable(tableName);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index 52e3a4c16..46ac60d9f 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -346,3 +346,48 @@ CREATE TABLE a (
v VARCHAR(10),
PRIMARY KEY (k)
);
+
+CREATE DATABASE database_shard_1;
+USE database_shard_1;
+
+CREATE TABLE t1 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+ k BIGINT,
+ v1 DOUBLE,
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE t3 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE DATABASE database_shard_2;
+USE database_shard_2;
+
+-- test schema merging
+CREATE TABLE t1 (
+ k INT,
+ v1 VARCHAR(20),
+ v2 BIGINT,
+ PRIMARY KEY (k)
+);
+
+-- test schema evolution
+CREATE TABLE t2 (
+ k BIGINT,
+ v1 DOUBLE,
+ PRIMARY KEY (k)
+);
+
+-- test some shard doesn't have primary key
+CREATE TABLE t3 (
+ k INT,
+ v1 VARCHAR(10)
+);
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
new file mode 100644
index 000000000..1237134f2
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/tablelist_test_setup.sql
@@ -0,0 +1,94 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant the test user 'paimonuser' all
privileges:
+--
+GRANT ALL PRIVILEGES ON *.* TO 'paimonuser'@'%';
+
+--
################################################################################
+-- MySqlSyncDatabaseTableListITCase
+--
################################################################################
+
+-- captured databases
+CREATE DATABASE shard_1;
+CREATE DATABASE shard_2;
+CREATE DATABASE x_shard_1;
+
+-- ignored databases
+CREATE DATABASE ignored;
+
+-- create tables
+
+USE shard_1;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
+INSERT INTO t1 VALUES (1, 'shard_1.t1');
+
+CREATE TABLE t11 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t11 VALUES (1, 'shard_1.t11');
+
+CREATE TABLE t2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t2 VALUES (1, 'shard_1.t2');
+
+CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t3 VALUES (1, 'shard_1.t3');
+
+CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO ta VALUES (1, 'shard_1.ta');
+
+CREATE TABLE taa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO taa VALUES (1, 'shard_1.taa');
+
+CREATE TABLE s1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
+INSERT INTO s1 VALUES (1, 'shard_1.s1');
+
+CREATE TABLE s2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO s2 VALUES (1, 'shard_1.s2');
+
+CREATE TABLE sa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO sa VALUES (1, 'shard_1.sa');
+
+CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO m VALUES (1, 'shard_1.m');
+
+USE shard_2;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t1 VALUES (2, 'shard_2.t1');
+
+CREATE TABLE t2 (k INT, name VARCHAR(100)); -- ignored because of pk absence
+INSERT INTO t2 VALUES (2, 'shard_2.t2');
+
+CREATE TABLE t22 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t22 VALUES (2, 'shard_2.t22');
+
+CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t3 VALUES (2, 'shard_2.t3');
+
+CREATE TABLE tb (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO tb VALUES (2, 'shard_2.tb');
+
+USE x_shard_1;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
+INSERT INTO t1 VALUES (3, 'x_shard_1.t1');
+
+USE ignored;
+
+CREATE TABLE t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
+INSERT INTO t1 VALUES (4, 'ignored.t1');