This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 5f0a184e24e141bb6e3d76b21eee41b0e2759baf
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 25 21:43:52 2023 +0800

    [flink] Support specifying table affix in MySQL CDC action (#1019)
---
 docs/content/how-to/cdc-ingestion.md               |   5 +
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |   9 +-
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java |  12 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  86 ++++++++++--
 .../flink/action/cdc/mysql/TableNameConverter.java |  46 ++++++
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 156 +++++++++++++++++++++
 .../src/test/resources/mysql/setup.sql             |  20 +++
 7 files changed, 309 insertions(+), 25 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index fc33e2bf4..204a7eb8a 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -113,6 +113,8 @@ To use this feature through `flink run`, run the following 
shell command.
     --warehouse <warehouse-path> \
     --database <database-name> \
     [--ignore-incompatible <true/false>] \
+    [--table-prefix <paimon-table-prefix>] \
+    [--table-suffix <paimon-table-suffix>] \
     [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
@@ -122,6 +124,9 @@ To use this feature through `flink run`, run the following 
shell command.
 * `--database` is the database name in Paimon catalog.
 * `--ignore-incompatible` is default false, in this case, if MySQL table name 
exists in Paimon and their schema is incompatible, 
 an exception will be thrown. You can specify it to true explicitly to ignore 
the incompatible tables and exception.
+* `--table-prefix` is the prefix of all Paimon tables to be synchronized. For 
example, if you want all synchronized tables 
+to have "ods_" as prefix, you can specify `--table-prefix ods_`.
+* `--table-suffix` is the suffix of all Paimon tables to be synchronized. The 
usage is same as `--table-prefix`.
 * `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format `key=value`. `hostname`, 
`username`, `password` and `database-name` are required configurations, others 
are optional. Note that `database-name` should be the exact name of the MySQL 
databse you want to synchronize. It can't be a regular expression. See its 
[document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto
 [...]
 * `--catalog-conf` is the configuration for Paimon catalog. Each configuration 
should be specified in the format `key=value`. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of catalog configurations.
 * `--table-conf` is the configuration for Paimon table sink. Each 
configuration should be specified in the format `key=value`. All Paimon sink 
table will be applied the same set of configurations. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of table configurations.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 173917fc7..d48c19301 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -62,14 +62,21 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final ZoneId serverTimeZone;
     private final boolean caseSensitive;
+    private final TableNameConverter tableNameConverter;
 
     private JsonNode payload;
     private Map<String, String> mySqlFieldTypes;
     private Map<String, String> fieldClassNames;
 
     public MySqlDebeziumJsonEventParser(ZoneId serverTimeZone, boolean 
caseSensitive) {
+        this(serverTimeZone, caseSensitive, new 
TableNameConverter(caseSensitive));
+    }
+
+    public MySqlDebeziumJsonEventParser(
+            ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter 
tableNameConverter) {
         this.serverTimeZone = serverTimeZone;
         this.caseSensitive = caseSensitive;
+        this.tableNameConverter = tableNameConverter;
     }
 
     @Override
@@ -95,7 +102,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     @Override
     public String tableName() {
         String tableName = payload.get("source").get("table").asText();
-        return caseSensitive ? tableName : tableName.toLowerCase();
+        return tableNameConverter.convert(tableName);
     }
 
     private void updateFieldTypes(JsonNode schema) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index fe32fbef0..6b5c2831e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -33,10 +33,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Utility class to load MySQL table schema with JDBC. */
 public class MySqlSchema {
 
-    // used for retrieving metadata and throwing error, do not convert to 
case-insensitive form
     private final String databaseName;
-    private final String originalTableName;
-    // might be converted to case-insensitive form
     private final String tableName;
 
     private final LinkedHashMap<String, DataType> fields;
@@ -46,8 +43,7 @@ public class MySqlSchema {
             DatabaseMetaData metaData, String databaseName, String tableName, 
boolean caseSensitive)
             throws Exception {
         this.databaseName = databaseName;
-        this.originalTableName = tableName;
-        this.tableName = caseSensitive ? tableName : tableName.toLowerCase();
+        this.tableName = tableName;
 
         fields = new LinkedHashMap<>();
         try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, 
null)) {
@@ -68,7 +64,7 @@ public class MySqlSchema {
                             !fields.containsKey(fieldName.toLowerCase()),
                             String.format(
                                     "Duplicate key '%s' in table '%s.%s' 
appears when converting fields map keys to case-insensitive form.",
-                                    fieldName, databaseName, 
originalTableName));
+                                    fieldName, databaseName, tableName));
                     fieldName = fieldName.toLowerCase();
                 }
                 fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType, 
precision, scale));
@@ -91,10 +87,6 @@ public class MySqlSchema {
         return databaseName;
     }
 
-    public String originalTableName() {
-        return originalTableName;
-    }
-
     public String tableName() {
         return tableName;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index cbf67c4c3..f442afe5c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -42,6 +42,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
@@ -53,6 +55,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /**
  * An {@link Action} which synchronize the whole MySQL database into one 
Paimon database.
  *
@@ -96,6 +100,8 @@ public class MySqlSyncDatabaseAction implements Action {
     private final String warehouse;
     private final String database;
     private final boolean ignoreIncompatible;
+    private final String tablePrefix;
+    private final String tableSuffix;
     private final Map<String, String> catalogConfig;
     private final Map<String, String> tableConfig;
 
@@ -106,16 +112,38 @@ public class MySqlSyncDatabaseAction implements Action {
             boolean ignoreIncompatible,
             Map<String, String> catalogConfig,
             Map<String, String> tableConfig) {
+        this(
+                mySqlConfig,
+                warehouse,
+                database,
+                ignoreIncompatible,
+                null,
+                null,
+                catalogConfig,
+                tableConfig);
+    }
+
+    MySqlSyncDatabaseAction(
+            Map<String, String> mySqlConfig,
+            String warehouse,
+            String database,
+            boolean ignoreIncompatible,
+            @Nullable String tablePrefix,
+            @Nullable String tableSuffix,
+            Map<String, String> catalogConfig,
+            Map<String, String> tableConfig) {
         this.mySqlConfig = Configuration.fromMap(mySqlConfig);
         this.warehouse = warehouse;
         this.database = database;
         this.ignoreIncompatible = ignoreIncompatible;
+        this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
+        this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
         this.catalogConfig = catalogConfig;
         this.tableConfig = tableConfig;
     }
 
     public void build(StreamExecutionEnvironment env) throws Exception {
-        Preconditions.checkArgument(
+        checkArgument(
                 !mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
                 MySqlSourceOptions.TABLE_NAME.key()
                         + " cannot be set for mysql-sync-database. "
@@ -129,31 +157,30 @@ public class MySqlSyncDatabaseAction implements Action {
         boolean caseSensitive = catalog.caseSensitive();
 
         if (!caseSensitive) {
-            Preconditions.checkArgument(
-                    database.equals(database.toLowerCase()),
-                    String.format(
-                            "Database name [%s] cannot contain upper case in 
case-insensitive catalog.",
-                            database));
+            validateCaseInsensitive();
         }
 
         List<MySqlSchema> mySqlSchemas = getMySqlSchemaList(caseSensitive);
-        Preconditions.checkArgument(
+        checkArgument(
                 mySqlSchemas.size() > 0,
                 "No tables found in MySQL database "
                         + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
                         + ", or MySQL database does not exist.");
 
         catalog.createDatabase(database, true);
+        TableNameConverter tableNameConverter =
+                new TableNameConverter(caseSensitive, tablePrefix, 
tableSuffix);
 
         List<FileStoreTable> fileStoreTables = new ArrayList<>();
         List<String> monitoredTables = new ArrayList<>();
         for (MySqlSchema mySqlSchema : mySqlSchemas) {
-            Identifier identifier = new Identifier(database, 
mySqlSchema.tableName());
+            String paimonTableName = 
tableNameConverter.convert(mySqlSchema.tableName());
+            Identifier identifier = new Identifier(database, paimonTableName);
             FileStoreTable table;
             try {
                 table = (FileStoreTable) catalog.getTable(identifier);
                 if (shouldMonitorTable(table.schema(), mySqlSchema, 
identifier)) {
-                    monitoredTables.add(mySqlSchema.originalTableName());
+                    monitoredTables.add(mySqlSchema.tableName());
                 }
             } catch (Catalog.TableNotExistException e) {
                 Schema schema =
@@ -164,7 +191,7 @@ public class MySqlSyncDatabaseAction implements Action {
                                 tableConfig);
                 catalog.createTable(identifier, schema, false);
                 table = (FileStoreTable) catalog.getTable(identifier);
-                monitoredTables.add(mySqlSchema.originalTableName());
+                monitoredTables.add(mySqlSchema.tableName());
             }
             fileStoreTables.add(table);
         }
@@ -181,7 +208,7 @@ public class MySqlSyncDatabaseAction implements Action {
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
         EventParser.Factory<String> parserFactory =
-                () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive);
+                () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive, 
tableNameConverter);
 
         FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
                 new FlinkCdcSyncDatabaseSinkBuilder<String>()
@@ -197,6 +224,24 @@ public class MySqlSyncDatabaseAction implements Action {
         sinkBuilder.build();
     }
 
+    private void validateCaseInsensitive() {
+        checkArgument(
+                database.equals(database.toLowerCase()),
+                String.format(
+                        "Database name [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        database));
+        checkArgument(
+                tablePrefix.equals(tablePrefix.toLowerCase()),
+                String.format(
+                        "Table prefix [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        tablePrefix));
+        checkArgument(
+                tableSuffix.equals(tableSuffix.toLowerCase()),
+                String.format(
+                        "Table suffix [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        tablePrefix));
+    }
+
     private List<MySqlSchema> getMySqlSchemaList(boolean caseSensitive) throws 
Exception {
         String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
         List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
@@ -230,7 +275,7 @@ public class MySqlSyncDatabaseAction implements Action {
                     identifier.getFullName(),
                     tableSchema.fields(),
                     mySqlSchema.databaseName(),
-                    mySqlSchema.originalTableName(),
+                    mySqlSchema.tableName(),
                     mySqlSchema.fields());
             return false;
         } else {
@@ -244,7 +289,7 @@ public class MySqlSyncDatabaseAction implements Action {
                             identifier.getFullName(),
                             tableSchema.fields(),
                             mySqlSchema.databaseName(),
-                            mySqlSchema.originalTableName(),
+                            mySqlSchema.tableName(),
                             mySqlSchema.fields()));
         }
     }
@@ -264,6 +309,8 @@ public class MySqlSyncDatabaseAction implements Action {
         String warehouse = params.get("warehouse");
         String database = params.get("database");
         boolean ignoreIncompatible = 
Boolean.parseBoolean(params.get("ignore-incompatible"));
+        String tablePrefix = params.get("table-prefix");
+        String tableSuffix = params.get("table-suffix");
 
         Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
         Map<String, String> catalogConfig = getConfigMap(params, 
"catalog-conf");
@@ -278,6 +325,8 @@ public class MySqlSyncDatabaseAction implements Action {
                         warehouse,
                         database,
                         ignoreIncompatible,
+                        tablePrefix,
+                        tableSuffix,
                         catalogConfig == null ? Collections.emptyMap() : 
catalogConfig,
                         tableConfig == null ? Collections.emptyMap() : 
tableConfig));
     }
@@ -314,7 +363,9 @@ public class MySqlSyncDatabaseAction implements Action {
         System.out.println("Syntax:");
         System.out.println(
                 "  mysql-sync-database --warehouse <warehouse-path> --database 
<database-name> "
-                        + "[--ignore-incompatible <true/false>]"
+                        + "[--ignore-incompatible <true/false>] "
+                        + "[--table-prefix <paimon-table-prefix>] "
+                        + "[--table-suffix <paimon-table-suffix>] "
                         + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
                         + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -324,6 +375,13 @@ public class MySqlSyncDatabaseAction implements Action {
                 "--ignore-incompatible is default false, in this case, if 
MySQL table name exists in Paimon "
                         + "and their schema is incompatible, an exception will 
be thrown. "
                         + "You can specify it to true explicitly to ignore the 
incompatible tables and exception.");
+        System.out.println();
+
+        System.out.println(
+                "--table-prefix is the prefix of all Paimon tables to be 
synchronized. For example, if you want all "
+                        + "synchronized tables to have \"ods_\" as prefix, you 
can specify `--table-prefix ods_`.");
+        System.out.println("The usage of --table-suffix is same as 
`--table-prefix`");
+        System.out.println();
 
         System.out.println("MySQL CDC source conf syntax:");
         System.out.println("  key=value");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/TableNameConverter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/TableNameConverter.java
new file mode 100644
index 000000000..94996b48a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/TableNameConverter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import java.io.Serializable;
+
+/** Used to convert a MySQL source table name to corresponding Paimon table 
name. */
+public class TableNameConverter implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final boolean caseSensitive;
+    private final String prefix;
+    private final String suffix;
+
+    TableNameConverter(boolean caseSensitive) {
+        this(caseSensitive, "", "");
+    }
+
+    TableNameConverter(boolean caseSensitive, String prefix, String suffix) {
+        this.caseSensitive = caseSensitive;
+        this.prefix = prefix;
+        this.suffix = suffix;
+    }
+
+    public String convert(String originName) {
+        String tableName = caseSensitive ? originName : 
originName.toLowerCase();
+        return prefix + tableName + suffix;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 5d5e021c0..7b0f5a581 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -340,6 +340,162 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         }
     }
 
+    @Test
+    public void testTableAffix() throws Exception {
+        // create table t1
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        catalog.createDatabase(database, true);
+        Identifier identifier = Identifier.create(database, 
"test_prefix_t1_test_suffix");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("k1", DataTypes.INT().notNull())
+                        .column("v0", DataTypes.VARCHAR(10))
+                        .primaryKey("k1")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+
+        // try synchronization
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "paimon_sync_database_affix");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 
1));
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        "test_prefix_",
+                        "_test_suffix",
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = conn.createStatement()) {
+            testTableAffixImpl(statement);
+        }
+    }
+
+    private void testTableAffixImpl(Statement statement) throws Exception {
+        FileStoreTable table1 = 
getFileStoreTable("test_prefix_t1_test_suffix");
+        FileStoreTable table2 = 
getFileStoreTable("test_prefix_t2_test_suffix");
+
+        statement.executeUpdate("USE paimon_sync_database_affix");
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two')");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four')");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"k1", "v0"});
+        List<String> primaryKeys1 = Collections.singletonList("k1");
+        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"k2", "v0"});
+        List<String> primaryKeys2 = Collections.singletonList("k2");
+        expected = Arrays.asList("+I[2, two]", "+I[4, four]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 INT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
+        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v1 VARCHAR(10)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 's_6')");
+        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 's_8')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.INT()
+                        },
+                        new String[] {"k1", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"k2", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, NULL]",
+                        "+I[4, four, NULL]",
+                        "+I[6, six, s_6]",
+                        "+I[8, eight, s_8]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v1 BIGINT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 
9000000000000)");
+        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v1 VARCHAR(20)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 
'long_s_10')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]",
+                        "+I[9, nine, 9000000000000]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), 
DataTypes.VARCHAR(20)
+                        },
+                        new String[] {"k2", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, NULL]",
+                        "+I[4, four, NULL]",
+                        "+I[6, six, s_6]",
+                        "+I[8, eight, s_8]",
+                        "+I[10, ten, long_s_10]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+
     private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 16876ebf7..bd3eb309b 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -308,3 +308,23 @@ CREATE TABLE compatible (
     v2 BIGINT,
     PRIMARY KEY (k1, k2)
 );
+
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase#testTableAffix
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database_affix;
+USE paimon_sync_database_affix;
+
+CREATE TABLE t1 (
+    k1 INT,
+    v0 VARCHAR(10),
+    PRIMARY KEY (k1)
+);
+
+CREATE TABLE t2 (
+    k2 INT,
+    v0 VARCHAR(10),
+    PRIMARY KEY (k2)
+);
+

Reply via email to