This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 8272b295b7dddb3993f9d24bcbef3ad16adf2321
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 25 10:03:09 2023 +0800

    [flink] Support skipping incompatible tables in MySqlSyncDatabaseAction 
(#1006)
---
 docs/content/how-to/cdc-ingestion.md               |  3 +
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java |  4 +
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 72 +++++++++++++++---
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 85 +++++++++++++++++++++-
 .../src/test/resources/mysql/setup.sql             | 21 ++++++
 5 files changed, 173 insertions(+), 12 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index afa198652..fc33e2bf4 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -112,6 +112,7 @@ To use this feature through `flink run`, run the following 
shell command.
     mysql-sync-database
     --warehouse <warehouse-path> \
     --database <database-name> \
+    [--ignore-incompatible <true/false>] \
     [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
@@ -119,6 +120,8 @@ To use this feature through `flink run`, run the following 
shell command.
 
 * `--warehouse` is the path to Paimon warehouse.
 * `--database` is the database name in Paimon catalog.
+* `--ignore-incompatible` is default false, in this case, if MySQL table name 
exists in Paimon and their schema is incompatible, 
+an exception will be thrown. You can specify it to true explicitly to ignore 
the incompatible tables and exception.
 * `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format `key=value`. `hostname`, 
`username`, `password` and `database-name` are required configurations, others 
are optional. Note that `database-name` should be the exact name of the MySQL 
databse you want to synchronize. It can't be a regular expression. See its 
[document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto
 [...]
 * `--catalog-conf` is the configuration for Paimon catalog. Each configuration 
should be specified in the format `key=value`. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of catalog configurations.
 * `--table-conf` is the configuration for Paimon table sink. Each 
configuration should be specified in the format `key=value`. All Paimon sink 
table will be applied the same set of configurations. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of table configurations.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 74f4746e5..fe32fbef0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -87,6 +87,10 @@ public class MySqlSchema {
         }
     }
 
+    public String databaseName() {
+        return databaseName;
+    }
+
     public String originalTableName() {
         return originalTableName;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index f867a6d44..cbf67c4c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -29,6 +29,7 @@ import 
org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
 
@@ -38,6 +39,8 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -49,7 +52,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * An {@link Action} which synchronize the whole MySQL database into one 
Paimon database.
@@ -88,9 +90,12 @@ import java.util.stream.Collectors;
  */
 public class MySqlSyncDatabaseAction implements Action {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSyncDatabaseAction.class);
+
     private final Configuration mySqlConfig;
     private final String warehouse;
     private final String database;
+    private final boolean ignoreIncompatible;
     private final Map<String, String> catalogConfig;
     private final Map<String, String> tableConfig;
 
@@ -98,11 +103,13 @@ public class MySqlSyncDatabaseAction implements Action {
             Map<String, String> mySqlConfig,
             String warehouse,
             String database,
+            boolean ignoreIncompatible,
             Map<String, String> catalogConfig,
             Map<String, String> tableConfig) {
         this.mySqlConfig = Configuration.fromMap(mySqlConfig);
         this.warehouse = warehouse;
         this.database = database;
+        this.ignoreIncompatible = ignoreIncompatible;
         this.catalogConfig = catalogConfig;
         this.tableConfig = tableConfig;
     }
@@ -136,24 +143,18 @@ public class MySqlSyncDatabaseAction implements Action {
                         + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
                         + ", or MySQL database does not exist.");
 
-        mySqlConfig.set(
-                MySqlSourceOptions.TABLE_NAME,
-                "("
-                        + mySqlSchemas.stream()
-                                .map(MySqlSchema::originalTableName)
-                                .collect(Collectors.joining("|"))
-                        + ")");
-        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig);
-
         catalog.createDatabase(database, true);
 
         List<FileStoreTable> fileStoreTables = new ArrayList<>();
+        List<String> monitoredTables = new ArrayList<>();
         for (MySqlSchema mySqlSchema : mySqlSchemas) {
             Identifier identifier = new Identifier(database, 
mySqlSchema.tableName());
             FileStoreTable table;
             try {
                 table = (FileStoreTable) catalog.getTable(identifier);
-                MySqlActionUtils.assertSchemaCompatible(table.schema(), 
mySqlSchema);
+                if (shouldMonitorTable(table.schema(), mySqlSchema, 
identifier)) {
+                    monitoredTables.add(mySqlSchema.originalTableName());
+                }
             } catch (Catalog.TableNotExistException e) {
                 Schema schema =
                         MySqlActionUtils.buildPaimonSchema(
@@ -163,10 +164,20 @@ public class MySqlSyncDatabaseAction implements Action {
                                 tableConfig);
                 catalog.createTable(identifier, schema, false);
                 table = (FileStoreTable) catalog.getTable(identifier);
+                monitoredTables.add(mySqlSchema.originalTableName());
             }
             fileStoreTables.add(table);
         }
 
+        Preconditions.checkState(
+                !monitoredTables.isEmpty(),
+                "No tables to be synchronized. Possible cause is the schemas 
of all tables in specified "
+                        + "MySQL database are not compatible with those of 
existed Paimon tables. Please check the log.");
+
+        mySqlConfig.set(
+                MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", 
monitoredTables) + ")");
+        MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig);
+
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
         EventParser.Factory<String> parserFactory =
@@ -207,6 +218,37 @@ public class MySqlSyncDatabaseAction implements Action {
         return mySqlSchemaList;
     }
 
+    private boolean shouldMonitorTable(
+            TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier 
identifier) {
+        if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) {
+            return true;
+        } else if (ignoreIncompatible) {
+            LOG.warn(
+                    "Incompatible schema found. This table will be ignored.\n"
+                            + "Paimon table is: {}, fields are: {}.\n"
+                            + "MySQL table is: {}.{}, fields are: {}.",
+                    identifier.getFullName(),
+                    tableSchema.fields(),
+                    mySqlSchema.databaseName(),
+                    mySqlSchema.originalTableName(),
+                    mySqlSchema.fields());
+            return false;
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Incompatible schema found.\n"
+                                    + "Paimon table is: %s, fields are: %s.\n"
+                                    + "MySQL table is: %s.%s, fields are: 
%s.\n"
+                                    + "If you want to ignore the incompatible 
tables, "
+                                    + "please specify --ignore-incompatible to 
true.",
+                            identifier.getFullName(),
+                            tableSchema.fields(),
+                            mySqlSchema.databaseName(),
+                            mySqlSchema.originalTableName(),
+                            mySqlSchema.fields()));
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
@@ -221,6 +263,7 @@ public class MySqlSyncDatabaseAction implements Action {
 
         String warehouse = params.get("warehouse");
         String database = params.get("database");
+        boolean ignoreIncompatible = 
Boolean.parseBoolean(params.get("ignore-incompatible"));
 
         Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
         Map<String, String> catalogConfig = getConfigMap(params, 
"catalog-conf");
@@ -234,6 +277,7 @@ public class MySqlSyncDatabaseAction implements Action {
                         mySqlConfig,
                         warehouse,
                         database,
+                        ignoreIncompatible,
                         catalogConfig == null ? Collections.emptyMap() : 
catalogConfig,
                         tableConfig == null ? Collections.emptyMap() : 
tableConfig));
     }
@@ -270,11 +314,17 @@ public class MySqlSyncDatabaseAction implements Action {
         System.out.println("Syntax:");
         System.out.println(
                 "  mysql-sync-database --warehouse <warehouse-path> --database 
<database-name> "
+                        + "[--ignore-incompatible <true/false>]"
                         + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
                         + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
         System.out.println();
 
+        System.out.println(
+                "--ignore-incompatible is default false, in this case, if 
MySQL table name exists in Paimon "
+                        + "and their schema is incompatible, an exception will 
be thrown. "
+                        + "You can specify it to true explicitly to ignore the 
incompatible tables and exception.");
+
         System.out.println("MySQL CDC source conf syntax:");
         System.out.println("  key=value");
         System.out.println(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 380c2a9a4..5d5e021c0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -70,7 +71,12 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 
1));
         MySqlSyncDatabaseAction action =
                 new MySqlSyncDatabaseAction(
-                        mySqlConfig, warehouse, database, 
Collections.emptyMap(), tableConfig);
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        Collections.emptyMap(),
+                        tableConfig);
         action.build(env);
         JobClient client = env.executeAsync();
 
@@ -218,6 +224,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         mySqlConfig,
                         warehouse,
                         database,
+                        false,
                         Collections.emptyMap(),
                         Collections.emptyMap());
 
@@ -244,6 +251,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         mySqlConfig,
                         warehouse,
                         database,
+                        false,
                         Collections.emptyMap(),
                         Collections.emptyMap());
 
@@ -257,6 +265,81 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         "No tables found in MySQL database invalid, or MySQL 
database does not exist.");
     }
 
+    @Test
+    @Timeout(60)
+    public void testIgnoreIncompatibleTables() throws Exception {
+        // create an incompatible table
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        catalog.createDatabase(database, true);
+        Identifier identifier = Identifier.create(database, "incompatible");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("k", DataTypes.STRING())
+                        .column("v1", DataTypes.STRING())
+                        .primaryKey("k")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+
+        // try synchronization
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", 
"paimon_sync_database_ignore_incompatible");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 
1));
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        true,
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        // validate `compatible` can be synchronized
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = conn.createStatement()) {
+            FileStoreTable table = getFileStoreTable("compatible");
+
+            statement.executeUpdate("USE 
paimon_sync_database_ignore_incompatible");
+            statement.executeUpdate("INSERT INTO compatible VALUES (2, 'two', 
20, 200)");
+            statement.executeUpdate("INSERT INTO compatible VALUES (4, 'four', 
40, 400)");
+
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(10).notNull(),
+                                DataTypes.INT(),
+                                DataTypes.BIGINT()
+                            },
+                            new String[] {"k1", "k2", "v1", "v2"});
+            List<String> primaryKeys2 = Arrays.asList("k1", "k2");
+            List<String> expected = Arrays.asList("+I[2, two, 20, 200]", 
"+I[4, four, 40, 400]");
+            waitForResult(expected, table, rowType, primaryKeys2);
+        }
+    }
+
     private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 0b17f85cc..92fcfd4f2 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -282,3 +282,24 @@ CREATE TABLE t2 (
 CREATE TABLE t3 (
     v1 INT
 );
+
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase#testIgnoreIncompatibleTables
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database_ignore_incompatible;
+USE paimon_sync_database_ignore_incompatible;
+
+CREATE TABLE incompatible (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE compatible (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);

Reply via email to