This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a988dee59 [flink][mysql-cdc] Refactor MySqlSchema related cods (#1783)
a988dee59 is described below
commit a988dee598b332d292b4083a0e8913b1e94e20aa
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 11 18:37:56 2023 +0800
[flink][mysql-cdc] Refactor MySqlSchema related cods (#1783)
---
.../flink/action/cdc/mysql/MySqlActionUtils.java | 36 +++---
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 121 ++++++++-------------
.../action/cdc/mysql/MySqlSyncTableAction.java | 53 +++++----
.../cdc/mysql/schema/AllMergedMySqlTableInfo.java | 79 ++++++++++++++
.../action/cdc/mysql/{ => schema}/MySqlSchema.java | 87 +++++++--------
.../action/cdc/mysql/schema/MySqlSchemasInfo.java | 105 ++++++++++++++++++
.../action/cdc/mysql/schema/MySqlTableInfo.java | 40 +++++++
.../mysql/schema/ShardsMergedMySqlTableInfo.java | 90 +++++++++++++++
.../cdc/mysql/schema/UnmergedMySqlTableInfo.java | 63 +++++++++++
.../action/cdc/mysql/MySqlActionITCaseBase.java | 24 +++-
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 43 ++++----
.../mysql/MySqlSyncDatabaseTableListITCase.java | 87 ++++++---------
.../cdc/mysql/MySqlSyncTableActionITCase.java | 71 ++++--------
.../test/resources/mysql/sync_database_setup.sql | 21 +++-
.../src/test/resources/mysql/sync_table_setup.sql | 30 ++---
15 files changed, 641 insertions(+), 309 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index fff0fdaf2..978d2eaa8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -20,11 +20,15 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchema;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Pair;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
@@ -35,7 +39,6 @@ import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -47,7 +50,6 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -102,14 +104,14 @@ public class MySqlActionUtils {
mySqlConfig.get(MySqlSourceOptions.PASSWORD));
}
- static List<MySqlSchema> getMySqlSchemaList(
+ static MySqlSchemasInfo getMySqlTableInfos(
Configuration mySqlConfig,
- Predicate<MySqlSchema> monitorTablePredication,
+ Predicate<String> monitorTablePredication,
List<Identifier> excludedTables)
throws Exception {
Pattern databasePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
- List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
+ MySqlSchemasInfo mySqlSchemasInfo = new MySqlSchemasInfo();
try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet schemas = metaData.getCatalogs()) {
@@ -121,15 +123,16 @@ public class MySqlActionUtils {
while (tables.next()) {
String tableName =
tables.getString("TABLE_NAME");
MySqlSchema mySqlSchema =
- new MySqlSchema(
+ MySqlSchema.buildSchema(
metaData,
databaseName,
tableName,
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL));
- if (monitorTablePredication.test(mySqlSchema))
{
- mySqlSchemaList.add(mySqlSchema);
+ Identifier identifier =
Identifier.create(databaseName, tableName);
+ if (monitorTablePredication.test(tableName)) {
+ mySqlSchemasInfo.addSchema(identifier,
mySqlSchema);
} else {
-
excludedTables.add(mySqlSchema.identifier());
+ excludedTables.add(identifier);
}
}
}
@@ -137,7 +140,7 @@ public class MySqlActionUtils {
}
}
}
- return mySqlSchemaList;
+ return mySqlSchemasInfo;
}
static void assertSchemaCompatible(TableSchema paimonSchema, Schema
mySqlSchema) {
@@ -173,7 +176,7 @@ public class MySqlActionUtils {
}
static Schema buildPaimonSchema(
- MySqlSchema mySqlSchema,
+ MySqlTableInfo mySqlTableInfo,
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
List<ComputedColumn> computedColumns,
@@ -181,23 +184,24 @@ public class MySqlActionUtils {
boolean caseSensitive) {
Schema.Builder builder = Schema.newBuilder();
builder.options(paimonConfig);
+ MySqlSchema mySqlSchema = mySqlTableInfo.schema();
// build columns and primary keys from mySqlSchema
- LinkedHashMap<String, Tuple2<DataType, String>> mySqlFields;
+ LinkedHashMap<String, Pair<DataType, String>> mySqlFields;
List<String> mySqlPrimaryKeys;
if (caseSensitive) {
mySqlFields = mySqlSchema.fields();
mySqlPrimaryKeys = mySqlSchema.primaryKeys();
} else {
mySqlFields = new LinkedHashMap<>();
- for (Map.Entry<String, Tuple2<DataType, String>> entry :
+ for (Map.Entry<String, Pair<DataType, String>> entry :
mySqlSchema.fields().entrySet()) {
String fieldName = entry.getKey();
checkArgument(
!mySqlFields.containsKey(fieldName.toLowerCase()),
String.format(
"Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
- fieldName, mySqlSchema.tableName()));
+ fieldName, mySqlTableInfo.location()));
mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
}
mySqlPrimaryKeys =
@@ -206,8 +210,8 @@ public class MySqlActionUtils {
.collect(Collectors.toList());
}
- for (Map.Entry<String, Tuple2<DataType, String>> entry :
mySqlFields.entrySet()) {
- builder.column(entry.getKey(), entry.getValue().f0,
entry.getValue().f1);
+ for (Map.Entry<String, Pair<DataType, String>> entry :
mySqlFields.entrySet()) {
+ builder.column(entry.getKey(), entry.getValue().getLeft(),
entry.getValue().getRight());
}
for (ComputedColumn computedColumn : computedColumns) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index a6746904e..3c62b0aa9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -26,6 +26,8 @@ import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.schema.Schema;
@@ -46,10 +48,8 @@ import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -111,7 +111,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
private final String includingTables;
private final DatabaseSyncMode mode;
- private List<Identifier> monitoredTables;
+ private final List<Identifier> monitoredTables = new ArrayList<>();
+ private final List<Identifier> excludedTables = new ArrayList<>();
public MySqlSyncDatabaseAction(
Map<String, String> mySqlConfig,
@@ -175,16 +176,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
validateCaseInsensitive();
}
- List<Identifier> excludedTables = new ArrayList<>();
- List<MySqlSchema> beforeMerging =
- MySqlActionUtils.getMySqlSchemaList(
- mySqlConfig, monitorTablePredication(),
excludedTables);
- monitoredTables =
-
beforeMerging.stream().map(MySqlSchema::identifier).collect(Collectors.toList());
- List<MySqlSchema> mySqlSchemas = mergeShards ?
mergeShards(beforeMerging) : beforeMerging;
+ MySqlSchemasInfo mySqlSchemasInfo =
+ MySqlActionUtils.getMySqlTableInfos(
+ mySqlConfig, this::shouldMonitorTable, excludedTables);
+
+ logNonPkTables(mySqlSchemasInfo.nonPkTables());
+ List<MySqlTableInfo> mySqlTableInfos =
mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
checkArgument(
- mySqlSchemas.size() > 0,
+ mySqlTableInfos.size() > 0,
"No tables found in MySQL database "
+ mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
+ ", or MySQL database does not exist.");
@@ -194,12 +194,14 @@ public class MySqlSyncDatabaseAction extends ActionBase {
new TableNameConverter(caseSensitive, mergeShards,
tablePrefix, tableSuffix);
List<FileStoreTable> fileStoreTables = new ArrayList<>();
- for (MySqlSchema mySqlSchema : mySqlSchemas) {
- Identifier identifier = buildPaimonIdentifier(tableNameConverter,
mySqlSchema);
+ for (MySqlTableInfo tableInfo : mySqlTableInfos) {
+ Identifier identifier =
+ Identifier.create(
+ database,
tableNameConverter.convert(tableInfo.toPaimonTableName()));
FileStoreTable table;
Schema fromMySql =
MySqlActionUtils.buildPaimonSchema(
- mySqlSchema,
+ tableInfo,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
@@ -208,16 +210,18 @@ public class MySqlSyncDatabaseAction extends ActionBase {
try {
table = (FileStoreTable) catalog.getTable(identifier);
Supplier<String> errMsg =
- incompatibleMessage(table.schema(), mySqlSchema,
identifier);
+ incompatibleMessage(table.schema(), tableInfo,
identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
fileStoreTables.add(table);
+ monitoredTables.addAll(tableInfo.identifiers());
} else {
- unmonitor(mySqlSchema);
+ excludedTables.addAll(tableInfo.identifiers());
}
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromMySql, false);
table = (FileStoreTable) catalog.getTable(identifier);
fileStoreTables.add(table);
+ monitoredTables.addAll(tableInfo.identifiers());
}
}
@@ -227,7 +231,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
+ "MySQL database are not compatible with those of
existed Paimon tables. Please check the log.");
MySqlSource<String> source =
- MySqlActionUtils.buildMySqlSource(mySqlConfig,
buildTableList(excludedTables));
+ MySqlActionUtils.buildMySqlSource(mySqlConfig,
buildTableList());
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
@@ -286,17 +290,16 @@ public class MySqlSyncDatabaseAction extends ActionBase {
tableSuffix));
}
- private Predicate<MySqlSchema> monitorTablePredication() {
- return schema -> {
- if (schema.primaryKeys().isEmpty()) {
- LOG.debug(
- "Didn't find primary keys from table '{}'. "
- + "This table won't be synchronized.",
- schema.identifier());
- return false;
- }
- return shouldMonitorTable(schema.tableName());
- };
+ private void logNonPkTables(List<Identifier> nonPkTables) {
+ if (!nonPkTables.isEmpty()) {
+ LOG.debug(
+ "Didn't find primary keys for tables '{}'. "
+ + "These tables won't be synchronized.",
+ nonPkTables.stream()
+ .map(Identifier::getFullName)
+ .collect(Collectors.joining(",")));
+ excludedTables.addAll(nonPkTables);
+ }
}
private boolean shouldMonitorTable(String mySqlTableName) {
@@ -325,7 +328,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
}
private Supplier<String> incompatibleMessage(
- TableSchema paimonSchema, MySqlSchema mySqlSchema, Identifier
identifier) {
+ TableSchema paimonSchema, MySqlTableInfo mySqlTableInfo,
Identifier identifier) {
return () ->
String.format(
"Incompatible schema found.\n"
@@ -333,49 +336,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
+ "MySQL table is: %s, fields are: %s.\n",
identifier.getFullName(),
paimonSchema.fields(),
- mySqlSchema.tableName(),
- mySqlSchema.fields());
- }
-
- /** Merge schemas for tables that have the same table name. */
- private List<MySqlSchema> mergeShards(List<MySqlSchema> rawMySqlSchemas) {
- Map<String, MySqlSchema> schemaMap = new HashMap<>();
- for (MySqlSchema rawSchema : rawMySqlSchemas) {
- String tableName = rawSchema.tableName();
- MySqlSchema schema = schemaMap.get(tableName);
- if (schema == null) {
- schemaMap.put(tableName, rawSchema);
- } else {
- schemaMap.put(tableName, schema.merge(rawSchema));
- }
- }
- return new ArrayList<>(schemaMap.values());
- }
-
- private Identifier buildPaimonIdentifier(
- TableNameConverter tableNameConverter, MySqlSchema mySqlSchema) {
- String tableName;
- if (mergeShards) {
- tableName = tableNameConverter.convert(mySqlSchema.tableName());
- } else {
- // the Paimon table name should be compound of origin database
name and table name
- // together to avoid name conflict
- tableName = tableNameConverter.convert(mySqlSchema.identifier());
- }
-
- return Identifier.create(database, tableName);
- }
-
- private void unmonitor(MySqlSchema mySqlSchema) {
- if (mergeShards) {
- // if schema has been merged, all shards with the same table name
should be removed
- monitoredTables =
- monitoredTables.stream()
- .filter(id ->
!id.getObjectName().equals(mySqlSchema.tableName()))
- .collect(Collectors.toList());
- } else {
- monitoredTables.remove(mySqlSchema.identifier());
- }
+ mySqlTableInfo.location(),
+ mySqlTableInfo.schema().fields());
}
@VisibleForTesting
@@ -383,11 +345,16 @@ public class MySqlSyncDatabaseAction extends ActionBase {
return monitoredTables;
}
+ @VisibleForTesting
+ public List<Identifier> excludedTables() {
+ return excludedTables;
+ }
+
/**
* See {@link
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils#discoverCapturedTables}
* and {@code MySqlSyncDatabaseTableListITCase}.
*/
- private String buildTableList(List<Identifier> excludedTables) {
+ private String buildTableList() {
String separatorRex = "\\.";
if (mode == DIVIDED) {
// In DIVIDED mode, we only concern about existed tables
@@ -395,23 +362,23 @@ public class MySqlSyncDatabaseAction extends ActionBase {
.map(t -> t.getDatabaseName() + separatorRex +
t.getObjectName())
.collect(Collectors.joining("|"));
} else if (mode == COMBINED) {
- // In COMBINED mode, we should consider both existed tables and
possible newly added
+ // In COMBINED mode, we should consider both existed tables and
possible newly created
// tables, so we should use regular expression to monitor all
valid tables and exclude
// certain invalid tables
// The table list is built by template:
- //
(?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
+ //
(?!(^db\\.tbl$)|(^...$))((databasePattern)\\.(including_pattern1|...))
// The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables
whose qualified name
// is exactly equal to 'db.tbl'
- // The including pattern
databasePattern\\.(including_pattern1|...) can include tables
+ // The including pattern
(databasePattern)\\.(including_pattern1|...) can include tables
// whose qualified name matches one of the patterns
// a table can be monitored only when its name meets the including
pattern and doesn't
// be excluded by excluding pattern at the same time
String includingPattern =
String.format(
- "%s%s(%s)",
+ "(%s)%s(%s)",
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
separatorRex,
includingTables);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index c60f4fa2e..ca26288d5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -24,6 +24,8 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
+import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.schema.Schema;
@@ -143,34 +145,21 @@ public class MySqlSyncTableAction extends ActionBase {
validateCaseInsensitive();
}
- List<MySqlSchema> mySqlSchemaList =
- MySqlActionUtils.getMySqlSchemaList(
+ MySqlSchemasInfo mySqlSchemasInfo =
+ MySqlActionUtils.getMySqlTableInfos(
mySqlConfig, monitorTablePredication(), new
ArrayList<>());
-
- String tableList =
- mySqlSchemaList.stream()
- .map(m -> m.identifier().getDatabaseName() + "." +
m.tableName())
- .collect(Collectors.joining("|"));
-
- MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
-
- MySqlSchema mySqlSchema =
- mySqlSchemaList.stream()
- .reduce(MySqlSchema::merge)
- .orElseThrow(
- () ->
- new RuntimeException(
- "No table satisfies the given
database name and table name"));
+ validateMySqlTableInfos(mySqlSchemasInfo);
catalog.createDatabase(database, true);
+ MySqlTableInfo tableInfo = mySqlSchemasInfo.mergeAll();
Identifier identifier = new Identifier(database, table);
FileStoreTable table;
List<ComputedColumn> computedColumns =
- buildComputedColumns(computedColumnArgs,
mySqlSchema.typeMapping());
+ buildComputedColumns(computedColumnArgs,
tableInfo.schema().typeMapping());
Schema fromMySql =
MySqlActionUtils.buildPaimonSchema(
- mySqlSchema,
+ tableInfo,
partitionKeys,
primaryKeys,
computedColumns,
@@ -196,6 +185,12 @@ public class MySqlSyncTableAction extends ActionBase {
table = (FileStoreTable) catalog.getTable(identifier);
}
+ String tableList =
+ mySqlSchemasInfo.pkTables().stream()
+ .map(i -> i.getDatabaseName() + "\\." +
i.getObjectName())
+ .collect(Collectors.joining("|"));
+ MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
+
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
@@ -247,11 +242,25 @@ public class MySqlSyncTableAction extends ActionBase {
}
}
- private Predicate<MySqlSchema> monitorTablePredication() {
- return schema -> {
+ private void validateMySqlTableInfos(MySqlSchemasInfo mySqlSchemasInfo) {
+ List<Identifier> nonPkTables = mySqlSchemasInfo.nonPkTables();
+ checkArgument(
+ nonPkTables.isEmpty(),
+ "Source tables of MySQL table synchronization job cannot
contain table "
+ + "which doesn't have primary keys.\n"
+ + "They are: %s",
+
nonPkTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
+
+ checkArgument(
+ !mySqlSchemasInfo.pkTables().isEmpty(),
+ "No table satisfies the given database name and table name.");
+ }
+
+ private Predicate<String> monitorTablePredication() {
+ return tableName -> {
Pattern tableNamePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
- return tableNamePattern.matcher(schema.tableName()).matches();
+ return tableNamePattern.matcher(tableName).matches();
};
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/AllMergedMySqlTableInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/AllMergedMySqlTableInfo.java
new file mode 100644
index 000000000..86ec73499
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/AllMergedMySqlTableInfo.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Describe a table whose schema is merged from all source tables. */
+public class AllMergedMySqlTableInfo implements MySqlTableInfo {
+
+ private final List<Identifier> fromTables;
+ private MySqlSchema schema;
+
+ public AllMergedMySqlTableInfo() {
+ this.fromTables = new ArrayList<>();
+ }
+
+ public void init(Identifier identifier, MySqlSchema schema) {
+ this.fromTables.add(identifier);
+ this.schema = schema;
+ }
+
+ public AllMergedMySqlTableInfo merge(Identifier otherTableId, MySqlSchema
other) {
+ schema = schema.merge(location(), otherTableId.getFullName(), other);
+ fromTables.add(otherTableId);
+ return this;
+ }
+
+ @Override
+ public String location() {
+ return String.format(
+ "{%s}",
+
fromTables.stream().map(Identifier::getFullName).collect(Collectors.joining(",")));
+ }
+
+ @Override
+ public List<Identifier> identifiers() {
+ throw new UnsupportedOperationException(
+ "AllMergedRichMySqlSchema doesn't support converting to
identifiers.");
+ }
+
+ @Override
+ public String tableName() {
+ throw new UnsupportedOperationException(
+ "AllMergedRichMySqlSchema doesn't support getting table
name.");
+ }
+
+ @Override
+ public String toPaimonTableName() {
+ throw new UnsupportedOperationException(
+ "AllMergedRichMySqlSchema doesn't support converting to Paimon
table name.");
+ }
+
+ @Override
+ public MySqlSchema schema() {
+ return checkNotNull(schema, "MySqlSchema hasn't been set.");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
similarity index 62%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 6626fd762..3e4964aff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -16,44 +16,42 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.action.cdc.mysql.schema;
-import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.types.DataType;
-
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.paimon.utils.Pair;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-
-import static org.apache.paimon.utils.Preconditions.checkState;
+import java.util.stream.Collectors;
/** Utility class to load MySQL table schema with JDBC. */
public class MySqlSchema {
- private final String databaseName;
- private final String tableName;
- private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
+ private final LinkedHashMap<String, Pair<DataType, String>> fields;
private final List<String> primaryKeys;
- private boolean hasMerged = false;
+ private MySqlSchema(
+ LinkedHashMap<String, Pair<DataType, String>> fields, List<String>
primaryKeys) {
+ this.fields = fields;
+ this.primaryKeys = primaryKeys;
+ }
- public MySqlSchema(
+ public static MySqlSchema buildSchema(
DatabaseMetaData metaData,
String databaseName,
String tableName,
boolean convertTinyintToBool)
- throws Exception {
- this.databaseName = databaseName;
- this.tableName = tableName;
-
- fields = new LinkedHashMap<>();
+ throws SQLException {
+ LinkedHashMap<String, Pair<DataType, String>> fields = new
LinkedHashMap<>();
try (ResultSet rs = metaData.getColumns(databaseName, null, tableName,
null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
@@ -70,77 +68,76 @@ public class MySqlSchema {
}
fields.put(
fieldName,
- Tuple2.of(
+ Pair.of(
MySqlTypeUtils.toDataType(
fieldType, precision, scale,
convertTinyintToBool),
fieldComment));
}
}
- primaryKeys = new ArrayList<>();
+ List<String> primaryKeys = new ArrayList<>();
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null,
tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
}
}
- }
- public String tableName() {
- return tableName;
+ return new MySqlSchema(fields, primaryKeys);
}
- public Identifier identifier() {
- checkState(!hasMerged, "Cannot get table identifier from merged
schema.");
- return Identifier.create(databaseName, tableName);
+ public LinkedHashMap<String, Pair<DataType, String>> fields() {
+ return fields;
}
- public LinkedHashMap<String, Tuple2<DataType, String>> fields() {
- return fields;
+ public List<String> primaryKeys() {
+ return primaryKeys;
}
public Map<String, DataType> typeMapping() {
Map<String, DataType> typeMapping = new HashMap<>();
- fields.forEach((name, tuple) -> typeMapping.put(name, tuple.f0));
+ fields.forEach((name, pair) -> typeMapping.put(name, pair.getLeft()));
return typeMapping;
}
- public List<String> primaryKeys() {
- return primaryKeys;
- }
-
- public MySqlSchema merge(MySqlSchema other) {
- hasMerged = true;
- for (Map.Entry<String, Tuple2<DataType, String>> entry :
other.fields.entrySet()) {
+ public MySqlSchema merge(String currentTable, String otherTable,
MySqlSchema other) {
+ for (Map.Entry<String, Pair<DataType, String>> entry :
other.fields.entrySet()) {
String fieldName = entry.getKey();
- DataType newType = entry.getValue().f0;
+ DataType newType = entry.getValue().getLeft();
if (fields.containsKey(fieldName)) {
- DataType oldType = fields.get(fieldName).f0;
+ DataType oldType = fields.get(fieldName).getLeft();
switch (UpdatedDataFieldsProcessFunction.canConvert(oldType,
newType)) {
case CONVERT:
- fields.put(fieldName, Tuple2.of(newType,
entry.getValue().f1));
+ fields.put(fieldName, Pair.of(newType,
entry.getValue().getRight()));
break;
case EXCEPTION:
throw new IllegalArgumentException(
String.format(
- "Column %s have different types in
table %s.%s and table %s.%s",
+ "Column %s have different types when
merging schemas.\n"
+ + "Current table '%s' fields:
%s\n"
+ + "To be merged table '%s'
fields: %s",
fieldName,
- databaseName,
- tableName,
- other.databaseName,
- other.tableName));
+ currentTable,
+ fieldsToString(),
+ otherTable,
+ other.fieldsToString()));
}
} else {
- fields.put(fieldName, Tuple2.of(newType, entry.getValue().f1));
+ fields.put(fieldName, Pair.of(newType,
entry.getValue().getRight()));
}
}
+
if (!primaryKeys.equals(other.primaryKeys)) {
primaryKeys.clear();
}
return this;
}
- public List<String> getPrimaryKeys() {
- return primaryKeys;
+ private String fieldsToString() {
+ return "["
+ + fields.entrySet().stream()
+ .map(e -> String.format("%s %s", e.getKey(),
e.getValue().getLeft()))
+ .collect(Collectors.joining(","))
+ + "]";
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemasInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemasInfo.java
new file mode 100644
index 000000000..5ae768804
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemasInfo.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utility class to manage MySQL tables and their schemas. */
+public class MySqlSchemasInfo {
+
+ private final Map<Identifier, MySqlSchema> pkTableSchemas;
+ private final Map<Identifier, MySqlSchema> nonPkTableSchemas;
+
+ public MySqlSchemasInfo() {
+ this.pkTableSchemas = new HashMap<>();
+ this.nonPkTableSchemas = new HashMap<>();
+ }
+
+ public void addSchema(Identifier identifier, MySqlSchema mysqlSchema) {
+ if (mysqlSchema.primaryKeys().isEmpty()) {
+ nonPkTableSchemas.put(identifier, mysqlSchema);
+ } else {
+ pkTableSchemas.put(identifier, mysqlSchema);
+ }
+ }
+
+ public List<Identifier> pkTables() {
+ return new ArrayList<>(pkTableSchemas.keySet());
+ }
+
+ public List<Identifier> nonPkTables() {
+ return new ArrayList<>(nonPkTableSchemas.keySet());
+ }
+
+ // only merge pk tables now
+ public MySqlTableInfo mergeAll() {
+ boolean initialized = false;
+ AllMergedMySqlTableInfo merged = new AllMergedMySqlTableInfo();
+ for (Map.Entry<Identifier, MySqlSchema> entry :
pkTableSchemas.entrySet()) {
+ Identifier id = entry.getKey();
+ MySqlSchema schema = entry.getValue();
+ if (!initialized) {
+ merged.init(id, schema);
+ initialized = true;
+ } else {
+ merged.merge(id, schema);
+ }
+ }
+ return merged;
+ }
+
+ // only handle pk tables now
+ public List<MySqlTableInfo> toMySqlTableInfos(boolean mergeShards) {
+ if (mergeShards) {
+ return mergeShards();
+ } else {
+ return pkTableSchemas.entrySet().stream()
+ .map(e -> new UnmergedMySqlTableInfo(e.getKey(),
e.getValue()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ // only merge pk tables now
+ /** Merge schemas for tables that have the same table name. */
+ private List<MySqlTableInfo> mergeShards() {
+ Map<String, ShardsMergedMySqlTableInfo> nameSchemaMap = new
HashMap<>();
+ for (Map.Entry<Identifier, MySqlSchema> entry :
pkTableSchemas.entrySet()) {
+ Identifier id = entry.getKey();
+ String tableName = id.getObjectName();
+
+ MySqlSchema toBeMerged = entry.getValue();
+ ShardsMergedMySqlTableInfo current = nameSchemaMap.get(tableName);
+ if (current == null) {
+ current = new ShardsMergedMySqlTableInfo();
+ current.init(id, toBeMerged);
+ nameSchemaMap.put(tableName, current);
+ } else {
+ nameSchemaMap.put(tableName, current.merge(id, toBeMerged));
+ }
+ }
+
+ return new ArrayList<>(nameSchemaMap.values());
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlTableInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlTableInfo.java
new file mode 100644
index 000000000..df9a5541f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlTableInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.List;
+
+/** Describe a MySQL table. */
+public interface MySqlTableInfo {
+
+ /** To indicate where is the table from. */
+ String location();
+
+ /** Return all MySQL table identifiers that build this schema. */
+ List<Identifier> identifiers();
+
+ String tableName();
+
+ /** Convert to corresponding Paimon table name. */
+ String toPaimonTableName();
+
+ MySqlSchema schema();
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/ShardsMergedMySqlTableInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/ShardsMergedMySqlTableInfo.java
new file mode 100644
index 000000000..ea43fbf2f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/ShardsMergedMySqlTableInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * Describe a table whose schema is merged from shards (tables with the same
name but in different
+ * database).
+ */
+public class ShardsMergedMySqlTableInfo implements MySqlTableInfo {
+
+ private final List<String> fromDatabases;
+
+ private String tableName;
+
+ private MySqlSchema schema;
+
+ public ShardsMergedMySqlTableInfo() {
+ this.fromDatabases = new ArrayList<>();
+ }
+
+ public void init(Identifier identifier, MySqlSchema schema) {
+ this.fromDatabases.add(identifier.getDatabaseName());
+ this.tableName = identifier.getObjectName();
+ this.schema = schema;
+ }
+
+ public ShardsMergedMySqlTableInfo merge(Identifier otherTableId,
MySqlSchema other) {
+ checkArgument(
+ otherTableId.getObjectName().equals(tableName),
+ "Table to be merged '%s' should equals to current table name
'%s'.",
+ otherTableId.getObjectName(),
+ tableName);
+
+ schema = schema.merge(location(), otherTableId.getFullName(), other);
+ fromDatabases.add(otherTableId.getDatabaseName());
+ return this;
+ }
+
+ @Override
+ public String location() {
+ return String.format("[%s].%s", String.join(",", fromDatabases),
tableName);
+ }
+
+ @Override
+ public List<Identifier> identifiers() {
+ return fromDatabases.stream()
+ .map(databaseName -> Identifier.create(databaseName,
tableName))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public String tableName() {
+ return tableName;
+ }
+
+ @Override
+ public String toPaimonTableName() {
+ return tableName;
+ }
+
+ @Override
+ public MySqlSchema schema() {
+ return checkNotNull(schema, "MySqlSchema hasn't been set.");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/UnmergedMySqlTableInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/UnmergedMySqlTableInfo.java
new file mode 100644
index 000000000..ed44c708e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/UnmergedMySqlTableInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Describe a table that is not merged. */
+public class UnmergedMySqlTableInfo implements MySqlTableInfo {
+
+ private final Identifier identifier;
+ private final MySqlSchema schema;
+
+ public UnmergedMySqlTableInfo(Identifier identifier, MySqlSchema schema) {
+ this.identifier = identifier;
+ this.schema = schema;
+ }
+
+ @Override
+ public String location() {
+ return identifier.getFullName();
+ }
+
+ @Override
+ public List<Identifier> identifiers() {
+ return Collections.singletonList(identifier);
+ }
+
+ @Override
+ public String tableName() {
+ return identifier.getObjectName();
+ }
+
+ @Override
+ public String toPaimonTableName() {
+ // the Paimon table name should be compound of origin database name
and table name
+ // together to avoid name conflict
+ return identifier.getDatabaseName() + "_" + identifier.getObjectName();
+ }
+
+ @Override
+ public MySqlSchema schema() {
+ return schema;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index 9d23ebc6e..00babf813 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.table.FileStoreTable;
@@ -40,6 +41,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -50,6 +52,7 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
/** Base test class for {@link org.apache.paimon.flink.action.Action}s related
to MySQL. */
+@SuppressWarnings("BusyWait")
public class MySqlActionITCaseBase extends ActionITCaseBase {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlActionITCaseBase.class);
@@ -166,6 +169,25 @@ public class MySqlActionITCaseBase extends
ActionITCaseBase {
protected FileStoreTable getFileStoreTable(String tableName) throws
Exception {
Identifier identifier = Identifier.create(database, tableName);
- return (FileStoreTable) catalog().getTable(identifier);
+ try (Catalog catalog = catalog()) {
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+ }
+
+ protected void waitingTables(String... tables) throws Exception {
+ waitingTables(Arrays.asList(tables));
+ }
+
+ protected void waitingTables(List<String> tables) throws Exception {
+ LOG.info("Waiting for tables '{}'", tables);
+ try (Catalog catalog = catalog()) {
+ while (true) {
+ List<String> actualTables = catalog.listTables(database);
+ if (actualTables.containsAll(tables)) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index eeb76d8be..4e2095344 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -1181,10 +1181,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
thread.start();
}
- Catalog catalog = catalog();
- while (!catalog.listTables(database).containsAll(tables)) {
- Thread.sleep(100);
- }
+ waitingTables(tables);
RowType newTableRowType =
RowType.of(
@@ -1202,7 +1199,13 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Timeout(60)
public void testSyncMultipleShards() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", "database_shard_.*");
+
+ // test table list
+ mySqlConfig.put(
+ "database-name",
+ ThreadLocalRandom.current().nextBoolean()
+ ? "database_shard_.*"
+ : "database_shard_1|database_shard_2");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
@@ -1307,10 +1310,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
"CREATE TABLE database_shard_2.t4 (k INT, v1
VARCHAR(10), PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO database_shard_2.t4
VALUES (2, 'db2_2')");
- Catalog catalog = catalog();
- while (!catalog.tableExists(Identifier.create(database,
"t4"))) {
- Thread.sleep(100);
- }
+ waitingTables("t4");
table = getFileStoreTable("t4");
rowType =
@@ -1428,13 +1428,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
statement.executeUpdate(
"INSERT INTO without_merging_shard_2.t3 VALUES (2,
'test')");
- while (!catalog.listTables(database)
- .containsAll(
- Arrays.asList(
- "without_merging_shard_1_t3",
- "without_merging_shard_2_t3"))) {
- Thread.sleep(100);
- }
+ waitingTables("without_merging_shard_1_t3",
"without_merging_shard_2_t3");
table = getFileStoreTable("without_merging_shard_1_t3");
rowType =
@@ -1458,7 +1452,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
@Test
- public void testUnminitorTablesWithMergingShards() throws Exception {
+ public void testMonitoredAndExcludedTablesWithMering() throws Exception {
// create an incompatible table named t2
Catalog catalog = catalog();
catalog.createDatabase(database, true);
@@ -1472,7 +1466,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
catalog.createTable(identifier, schema, false);
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", "test_unmonitor_table_shard_.*");
+ mySqlConfig.put("database-name", "monitored_and_excluded_shard_.*");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1494,8 +1488,17 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
assertThat(action.monitoredTables())
.containsOnly(
- Identifier.create("test_unmonitor_table_shard_1",
"t1"),
- Identifier.create("test_unmonitor_table_shard_2",
"t1"));
+ Identifier.create("monitored_and_excluded_shard_1",
"t1"),
+ Identifier.create("monitored_and_excluded_shard_1",
"t3"),
+ Identifier.create("monitored_and_excluded_shard_2",
"t1"));
+
+ assertThat(action.excludedTables())
+ .containsOnly(
+ // t2 is merged, so all shards will be excluded
+ Identifier.create("monitored_and_excluded_shard_1",
"t2"),
+ Identifier.create("monitored_and_excluded_shard_2",
"t2"),
+ // non pk table
+ Identifier.create("monitored_and_excluded_shard_2",
"t3"));
}
private void assertTableExists(List<String> tableNames) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index faf9f2977..2c73dc330 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -18,8 +18,10 @@
package org.apache.paimon.flink.action.cdc.mysql;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
@@ -30,7 +32,6 @@ import org.junit.jupiter.api.Timeout;
import java.sql.Statement;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -51,7 +52,11 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
@Timeout(120)
public void testActionRunResult() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", ".*shard_.*");
+ mySqlConfig.put(
+ "database-name",
+ ThreadLocalRandom.current().nextBoolean()
+ ? ".*shard_.*"
+ : "shard_1|shard_2|shard_3|x_shard_1");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
@@ -78,24 +83,33 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Statement statement = getStatement()) {
- Catalog catalog = catalog();
- List<String> tables = waitingAllTables(catalog, 10, 10);
- assertThat(tables)
- .containsExactlyInAnyOrder(
- "shard_1_t11",
- "shard_1_t2",
- "shard_1_t3",
- "shard_1_taa",
- "shard_1_s2",
- "shard_2_t1",
- "shard_2_t22",
- "shard_2_t3",
- "shard_2_tb",
- "x_shard_1_t1");
-
- // test newly added tables
- if (mode == COMBINED) {
+ assertThat(catalog().listTables(database))
+ .containsExactlyInAnyOrder(
+ "shard_1_t11",
+ "shard_1_t2",
+ "shard_1_t3",
+ "shard_1_taa",
+ "shard_1_s2",
+ "shard_2_t1",
+ "shard_2_t22",
+ "shard_2_t3",
+ "shard_2_tb",
+ "x_shard_1_t1");
+
+ // test newly created tables
+ if (mode == COMBINED) {
+ try (Statement statement = getStatement()) {
+ // ensure the job steps into incremental phase
+ statement.executeUpdate("USE shard_1");
+ statement.executeUpdate("INSERT INTO t2 VALUES (1, 'A')");
+ waitForResult(
+ Collections.singletonList("+I[1, A]"),
+ getFileStoreTable("shard_1_t2"),
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(100)},
+ new String[] {"k", "name"}),
+ Collections.singletonList("k"));
+
// case 1: new tables in existed database
statement.executeUpdate("USE shard_2");
// ignored: ta
@@ -127,37 +141,8 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
statement.executeUpdate(
"CREATE TABLE s4 (k INT, name VARCHAR(100), PRIMARY
KEY (k))");
- tables = waitingAllTables(catalog, 12, 10);
-
- assertThat(tables)
- .containsExactlyInAnyOrder(
- // old
- "shard_1_t11",
- "shard_1_t2",
- "shard_1_t3",
- "shard_1_taa",
- "shard_1_s2",
- "shard_2_t1",
- "shard_2_t22",
- "shard_2_t3",
- "shard_2_tb",
- "x_shard_1_t1",
- // new
- "shard_2_s3",
- "shard_3_tab");
+ waitingTables("shard_2_s3", "shard_3_tab");
}
}
}
-
- private List<String> waitingAllTables(Catalog catalog, int numberOfTables,
int maxAttempt)
- throws Exception {
- List<String> tables;
- int attempt = 0;
- do {
- Thread.sleep(5_000);
- tables = catalog.listTables(database);
- } while (tables.size() < numberOfTables && ++attempt < maxAttempt);
-
- return tables;
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9f22898a5..1195f3dac 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -42,6 +43,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -648,13 +650,12 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Collections.emptyMap());
assertThatThrownBy(() -> action.build(env))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage(
- "Column v1 have different types in table "
- + DATABASE_NAME
- + ".incompatible_field_1 and table "
- + DATABASE_NAME
- + ".incompatible_field_2");
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ IllegalArgumentException.class,
+ "Column v1 have different types when merging
schemas.\n"
+ + "Current table
'{paimon_sync_table.incompatible_field_2}' fields: [_id INT,v1 INT]\n"
+ + "To be merged table
'paimon_sync_table.incompatible_field_1' fields: [_id INT,v1 TIMESTAMP(0)]"));
}
@Test
@@ -983,10 +984,16 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
}
@Test
+ @Timeout(60)
public void testSyncShards() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", "shard_.+");
- mySqlConfig.put("table-name", "t.+");
+
+ // test table list
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ String dbPattern = random.nextBoolean() ? "shard_.+" :
"shard_1|shard_2";
+ String tblPattern = random.nextBoolean() ? "t.+" : "t1|t2";
+ mySqlConfig.put("database-name", dbPattern);
+ mySqlConfig.put("table-name", tblPattern);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
@@ -1010,9 +1017,11 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
try (Statement statement = getStatement()) {
statement.execute("USE shard_1");
- statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30'),
(2, '2023-07-30')");
+ statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30')");
+ statement.executeUpdate("INSERT INTO t2 VALUES (2, '2023-07-30')");
statement.execute("USE shard_2");
- statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31'),
(4, '2023-07-31')");
+ statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31')");
+ statement.executeUpdate("INSERT INTO t1 VALUES (4, '2023-07-31')");
}
FileStoreTable table = getFileStoreTable();
@@ -1035,46 +1044,6 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Arrays.asList("pk", "pt"));
}
- @Test
- public void testSyncMultipleTable() throws Exception {
- Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", "paimon_multiple_table");
- mySqlConfig.put("table-name", "t1|t2");
-
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.emptyList(),
- Collections.singletonList("id"),
- Collections.emptyList(),
- Collections.emptyMap(),
- Collections.emptyMap());
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
-
- FileStoreTable table = getFileStoreTable();
- RowType rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
- },
- new String[] {"id", "name"});
- waitForResult(
- Arrays.asList("+I[1, flink]", "+I[2, paimon]"),
- table,
- rowType,
- Collections.singletonList("id"));
- }
-
private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
index 59c720c06..b4343f1f2 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql
@@ -432,11 +432,11 @@ CREATE TABLE t2 (
);
--
################################################################################
--- testUnmonitorTablesWithMergingShards
+-- testMonitoredAndExcludedTablesWithMering
--
################################################################################
-CREATE DATABASE test_unmonitor_table_shard_1;
-USE test_unmonitor_table_shard_1;
+CREATE DATABASE monitored_and_excluded_shard_1;
+USE monitored_and_excluded_shard_1;
CREATE TABLE t1 (
k INT,
@@ -450,9 +450,15 @@ CREATE TABLE t2 (
PRIMARY KEY (k)
);
+CREATE TABLE t3 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
-CREATE DATABASE test_unmonitor_table_shard_2;
-USE test_unmonitor_table_shard_2;
+
+CREATE DATABASE monitored_and_excluded_shard_2;
+USE monitored_and_excluded_shard_2;
CREATE TABLE t1 (
k INT,
@@ -466,3 +472,8 @@ CREATE TABLE t2 (
PRIMARY KEY (k)
);
+CREATE TABLE t3 (
+ k INT,
+ v2 VARCHAR(10)
+);
+
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 8971afc1d..e10df0afa 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -308,35 +308,23 @@ CREATE TABLE t1 (
PRIMARY KEY (pk)
);
-CREATE DATABASE shard_2;
-USE shard_2;
-
-CREATE TABLE t1 (
+CREATE TABLE t2 (
pk INT,
_date VARCHAR(10),
PRIMARY KEY (pk)
);
-
---
################################################################################
--- testSyncMultipleTable
---
################################################################################
-
-CREATE DATABASE paimon_multiple_table;
-USE paimon_multiple_table;
+CREATE DATABASE shard_2;
+USE shard_2;
CREATE TABLE t1 (
- id INT,
- name VARCHAR(10),
- PRIMARY KEY (id)
+ pk INT,
+ _date VARCHAR(10),
+ PRIMARY KEY (pk)
);
-INSERT INTO t1 VALUES (1, 'flink');
-
CREATE TABLE t2 (
- id INT,
- name VARCHAR(10),
- PRIMARY KEY (id)
+ pk INT,
+ _date VARCHAR(10),
+ PRIMARY KEY (pk)
);
-
-INSERT INTO t2 VALUES (2, 'paimon');
\ No newline at end of file