This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 8272b295b7dddb3993f9d24bcbef3ad16adf2321 Author: yuzelin <[email protected]> AuthorDate: Tue Apr 25 10:03:09 2023 +0800 [flink] Support skipping incompatible tables in MySqlSyncDatabaseAction (#1006) --- docs/content/how-to/cdc-ingestion.md | 3 + .../paimon/flink/action/cdc/mysql/MySqlSchema.java | 4 + .../action/cdc/mysql/MySqlSyncDatabaseAction.java | 72 +++++++++++++++--- .../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 85 +++++++++++++++++++++- .../src/test/resources/mysql/setup.sql | 21 ++++++ 5 files changed, 173 insertions(+), 12 deletions(-) diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index afa198652..fc33e2bf4 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -112,6 +112,7 @@ To use this feature through `flink run`, run the following shell command. mysql-sync-database --warehouse <warehouse-path> \ --database <database-name> \ + [--ignore-incompatible <true/false>] \ [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \ [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \ [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]] @@ -119,6 +120,8 @@ To use this feature through `flink run`, run the following shell command. * `--warehouse` is the path to Paimon warehouse. * `--database` is the database name in Paimon catalog. +* `--ignore-incompatible` is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible, +an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. * `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format `key=value`. `hostname`, `username`, `password` and `database-name` are required configurations, others are optional. Note that `database-name` should be the exact name of the MySQL databse you want to synchronize. It can't be a regular expression. See its [document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto [...] * `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations. * `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. All Paimon sink table will be applied the same set of configurations. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java index 74f4746e5..fe32fbef0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java @@ -87,6 +87,10 @@ public class MySqlSchema { } } + public String databaseName() { + return databaseName; + } + public String originalTableName() { return originalTableName; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index f867a6d44..cbf67c4c3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -29,6 +29,7 @@ import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; @@ -38,6 +39,8 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -49,7 +52,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; /** * An {@link Action} which synchronize the whole MySQL database into one Paimon database. @@ -88,9 +90,12 @@ import java.util.stream.Collectors; */ public class MySqlSyncDatabaseAction implements Action { + private static final Logger LOG = LoggerFactory.getLogger(MySqlSyncDatabaseAction.class); + private final Configuration mySqlConfig; private final String warehouse; private final String database; + private final boolean ignoreIncompatible; private final Map<String, String> catalogConfig; private final Map<String, String> tableConfig; @@ -98,11 +103,13 @@ public class MySqlSyncDatabaseAction implements Action { Map<String, String> mySqlConfig, String warehouse, String database, + boolean ignoreIncompatible, Map<String, String> catalogConfig, Map<String, String> tableConfig) { this.mySqlConfig = Configuration.fromMap(mySqlConfig); this.warehouse = warehouse; this.database = database; + this.ignoreIncompatible = ignoreIncompatible; this.catalogConfig = catalogConfig; this.tableConfig = tableConfig; } @@ -136,24 +143,18 @@ public class MySqlSyncDatabaseAction implements Action { + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME) + ", or MySQL database does not exist."); - mySqlConfig.set( - MySqlSourceOptions.TABLE_NAME, - "(" - + mySqlSchemas.stream() - .map(MySqlSchema::originalTableName) - .collect(Collectors.joining("|")) - + ")"); - MySqlSource<String> source = MySqlActionUtils.buildMySqlSource(mySqlConfig); - catalog.createDatabase(database, true); List<FileStoreTable> fileStoreTables = new ArrayList<>(); + List<String> monitoredTables = new ArrayList<>(); for (MySqlSchema mySqlSchema : mySqlSchemas) { Identifier identifier = new Identifier(database, mySqlSchema.tableName()); FileStoreTable table; try { table = (FileStoreTable) catalog.getTable(identifier); - MySqlActionUtils.assertSchemaCompatible(table.schema(), mySqlSchema); + if (shouldMonitorTable(table.schema(), mySqlSchema, identifier)) { + monitoredTables.add(mySqlSchema.originalTableName()); + } } catch (Catalog.TableNotExistException e) { Schema schema = MySqlActionUtils.buildPaimonSchema( @@ -163,10 +164,20 @@ public class MySqlSyncDatabaseAction implements Action { tableConfig); catalog.createTable(identifier, schema, false); table = (FileStoreTable) catalog.getTable(identifier); + monitoredTables.add(mySqlSchema.originalTableName()); } fileStoreTables.add(table); } + Preconditions.checkState( + !monitoredTables.isEmpty(), + "No tables to be synchronized. Possible cause is the schemas of all tables in specified " + + "MySQL database are not compatible with those of existed Paimon tables. Please check the log."); + + mySqlConfig.set( + MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", monitoredTables) + ")"); + MySqlSource<String> source = MySqlActionUtils.buildMySqlSource(mySqlConfig); + String serverTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE); ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone); EventParser.Factory<String> parserFactory = @@ -207,6 +218,37 @@ public class MySqlSyncDatabaseAction implements Action { return mySqlSchemaList; } + private boolean shouldMonitorTable( + TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier identifier) { + if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) { + return true; + } else if (ignoreIncompatible) { + LOG.warn( + "Incompatible schema found. This table will be ignored.\n" + + "Paimon table is: {}, fields are: {}.\n" + + "MySQL table is: {}.{}, fields are: {}.", + identifier.getFullName(), + tableSchema.fields(), + mySqlSchema.databaseName(), + mySqlSchema.originalTableName(), + mySqlSchema.fields()); + return false; + } else { + throw new IllegalArgumentException( + String.format( + "Incompatible schema found.\n" + + "Paimon table is: %s, fields are: %s.\n" + + "MySQL table is: %s.%s, fields are: %s.\n" + + "If you want to ignore the incompatible tables, " + + "please specify --ignore-incompatible to true.", + identifier.getFullName(), + tableSchema.fields(), + mySqlSchema.databaseName(), + mySqlSchema.originalTableName(), + mySqlSchema.fields())); + } + } + // ------------------------------------------------------------------------ // Flink run methods // ------------------------------------------------------------------------ @@ -221,6 +263,7 @@ public class MySqlSyncDatabaseAction implements Action { String warehouse = params.get("warehouse"); String database = params.get("database"); + boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible")); Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf"); Map<String, String> catalogConfig = getConfigMap(params, "catalog-conf"); @@ -234,6 +277,7 @@ public class MySqlSyncDatabaseAction implements Action { mySqlConfig, warehouse, database, + ignoreIncompatible, catalogConfig == null ? Collections.emptyMap() : catalogConfig, tableConfig == null ? Collections.emptyMap() : tableConfig)); } @@ -270,11 +314,17 @@ public class MySqlSyncDatabaseAction implements Action { System.out.println("Syntax:"); System.out.println( " mysql-sync-database --warehouse <warehouse-path> --database <database-name> " + + "[--ignore-incompatible <true/false>]" + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] " + "[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] " + "[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]"); System.out.println(); + System.out.println( + "--ignore-incompatible is default false, in this case, if MySQL table name exists in Paimon " + + "and their schema is incompatible, an exception will be thrown. " + + "You can specify it to true explicitly to ignore the incompatible tables and exception."); + System.out.println("MySQL CDC source conf syntax:"); System.out.println(" key=value"); System.out.println( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 380c2a9a4..5d5e021c0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -70,7 +71,12 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); MySqlSyncDatabaseAction action = new MySqlSyncDatabaseAction( - mySqlConfig, warehouse, database, Collections.emptyMap(), tableConfig); + mySqlConfig, + warehouse, + database, + false, + Collections.emptyMap(), + tableConfig); action.build(env); JobClient client = env.executeAsync(); @@ -218,6 +224,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { mySqlConfig, warehouse, database, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -244,6 +251,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { mySqlConfig, warehouse, database, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -257,6 +265,81 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { "No tables found in MySQL database invalid, or MySQL database does not exist."); } + @Test + @Timeout(60) + public void testIgnoreIncompatibleTables() throws Exception { + // create an incompatible table + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + catalog.createDatabase(database, true); + Identifier identifier = Identifier.create(database, "incompatible"); + Schema schema = + Schema.newBuilder() + .column("k", DataTypes.STRING()) + .column("v1", DataTypes.STRING()) + .primaryKey("k") + .build(); + catalog.createTable(identifier, schema, false); + + // try synchronization + Map<String, String> mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "paimon_sync_database_ignore_incompatible"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map<String, String> tableConfig = new HashMap<>(); + tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1)); + tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); + MySqlSyncDatabaseAction action = + new MySqlSyncDatabaseAction( + mySqlConfig, + warehouse, + database, + true, + Collections.emptyMap(), + tableConfig); + action.build(env); + JobClient client = env.executeAsync(); + + while (true) { + JobStatus status = client.getJobStatus().get(); + if (status == JobStatus.RUNNING) { + break; + } + Thread.sleep(1000); + } + + // validate `compatible` can be synchronized + try (Connection conn = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement statement = conn.createStatement()) { + FileStoreTable table = getFileStoreTable("compatible"); + + statement.executeUpdate("USE paimon_sync_database_ignore_incompatible"); + statement.executeUpdate("INSERT INTO compatible VALUES (2, 'two', 20, 200)"); + statement.executeUpdate("INSERT INTO compatible VALUES (4, 'four', 40, 400)"); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10).notNull(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"k1", "k2", "v1", "v2"}); + List<String> primaryKeys2 = Arrays.asList("k1", "k2"); + List<String> expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"); + waitForResult(expected, table, rowType, primaryKeys2); + } + } + private FileStoreTable getFileStoreTable(String tableName) throws Exception { Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); Identifier identifier = Identifier.create(database, tableName); diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql index 0b17f85cc..92fcfd4f2 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql +++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql @@ -282,3 +282,24 @@ CREATE TABLE t2 ( CREATE TABLE t3 ( v1 INT ); + +-- ################################################################################ +-- MySqlSyncDatabaseActionITCase#testIgnoreIncompatibleTables +-- ################################################################################ + +CREATE DATABASE paimon_sync_database_ignore_incompatible; +USE paimon_sync_database_ignore_incompatible; + +CREATE TABLE incompatible ( + k INT, + v1 VARCHAR(10), + PRIMARY KEY (k) +); + +CREATE TABLE compatible ( + k1 INT, + k2 VARCHAR(10), + v1 INT, + v2 BIGINT, + PRIMARY KEY (k1, k2) +);
