This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 70e6a6d2c [hofix] Fix Mysql Sync table with computed columns to exists 
paimon table
70e6a6d2c is described below

commit 70e6a6d2c3ae4fbda026a00e43abeded530f0707
Author: Jingsong <[email protected]>
AuthorDate: Fri Jul 21 21:00:17 2023 +0800

    [hofix] Fix Mysql Sync table with computed columns to exists paimon table
---
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  16 ++-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 136 +++++++++++----------
 2 files changed, 86 insertions(+), 66 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index fef0bdb72..e85d1121a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -165,9 +166,18 @@ public class MySqlSyncTableAction extends ActionBase {
                         caseSensitive);
         try {
             table = (FileStoreTable) catalog.getTable(identifier);
-            checkArgument(
-                    computedColumnArgs.isEmpty(),
-                    "Cannot add computed column when table already exists.");
+            if (computedColumns.size() > 0) {
+                List<String> computedFields =
+                        computedColumns.stream()
+                                .map(ComputedColumn::columnName)
+                                .collect(Collectors.toList());
+                List<String> fieldNames = table.schema().fieldNames();
+                checkArgument(
+                        fieldNames.containsAll(computedFields),
+                        " Exists Table should contain all computed columns %s, 
but are %s.",
+                        computedFields,
+                        fieldNames);
+            }
             MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
         } catch (Catalog.TableNotExistException e) {
             catalog.createTable(identifier, fromMySql, false);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 033c0043b..94cda55eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -772,8 +772,16 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
     }
 
     @Test
-    @Timeout(30)
+    @Timeout(60)
     public void testComputedColumn() throws Exception {
+        // the first round checks for table creation
+        // the second round checks for running the action on an existing table
+        for (int i = 0; i < 2; i++) {
+            innerTestComputedColumn(i == 0);
+        }
+    }
+
+    private void innerTestComputedColumn(boolean executeMysql) throws 
Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "test_computed_column");
@@ -816,69 +824,71 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         JobClient client = env.executeAsync();
         waitJobRunning(client);
 
-        try (Connection conn =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
-                                MYSQL_CONTAINER.getUsername(),
-                                MYSQL_CONTAINER.getPassword());
-                Statement statement = conn.createStatement()) {
-            statement.execute("USE paimon_sync_table");
-            statement.executeUpdate(
-                    "INSERT INTO test_computed_column VALUES (1, '2023-03-23', 
'2022-01-01 14:30', '2021-09-15 15:00:10')");
-            statement.executeUpdate(
-                    "INSERT INTO test_computed_column VALUES (2, '2023-03-23', 
null, null)");
-
-            FileStoreTable table = getFileStoreTable();
-            RowType rowType =
-                    RowType.of(
-                            new DataType[] {
-                                DataTypes.INT().notNull(),
-                                DataTypes.DATE(),
-                                DataTypes.TIMESTAMP(0),
-                                DataTypes.TIMESTAMP(0),
-                                DataTypes.INT().notNull(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.INT(),
-                                DataTypes.STRING(),
-                                DataTypes.STRING(),
-                                DataTypes.INT()
-                            },
-                            new String[] {
-                                "pk",
-                                "_date",
-                                "_datetime",
-                                "_timestamp",
-                                "_year_date",
-                                "_year_datetime",
-                                "_year_timestamp",
-                                "_month_date",
-                                "_month_datetime",
-                                "_month_timestamp",
-                                "_day_date",
-                                "_day_datetime",
-                                "_day_timestamp",
-                                "_hour_date",
-                                "_hour_datetime",
-                                "_hour_timestamp",
-                                "_substring_date1",
-                                "_substring_date2",
-                                "_truncate_date"
-                            });
-            List<String> expected =
-                    Arrays.asList(
-                            "+I[1, 19439, 2022-01-01T14:30, 
2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 23-03-23, 
09-15, 0]",
-                            "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, 
NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 23-03-23, NULL, 2]");
-            waitForResult(expected, table, rowType, Arrays.asList("pk", 
"_year_date"));
+        if (executeMysql) {
+            try (Connection conn =
+                            DriverManager.getConnection(
+                                    MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                                    MYSQL_CONTAINER.getUsername(),
+                                    MYSQL_CONTAINER.getPassword());
+                    Statement statement = conn.createStatement()) {
+                statement.execute("USE paimon_sync_table");
+                statement.executeUpdate(
+                        "INSERT INTO test_computed_column VALUES (1, 
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
+                statement.executeUpdate(
+                        "INSERT INTO test_computed_column VALUES (2, 
'2023-03-23', null, null)");
+            }
         }
+
+        FileStoreTable table = getFileStoreTable();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.DATE(),
+                            DataTypes.TIMESTAMP(0),
+                            DataTypes.TIMESTAMP(0),
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.INT()
+                        },
+                        new String[] {
+                            "pk",
+                            "_date",
+                            "_datetime",
+                            "_timestamp",
+                            "_year_date",
+                            "_year_datetime",
+                            "_year_timestamp",
+                            "_month_date",
+                            "_month_datetime",
+                            "_month_timestamp",
+                            "_day_date",
+                            "_day_datetime",
+                            "_day_timestamp",
+                            "_hour_date",
+                            "_hour_datetime",
+                            "_hour_timestamp",
+                            "_substring_date1",
+                            "_substring_date2",
+                            "_truncate_date"
+                        });
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 23-03-23, 09-15, 0]",
+                        "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, 
NULL, 23, NULL, NULL, 0, NULL, NULL, 23-03-23, NULL, 2]");
+        waitForResult(expected, table, rowType, Arrays.asList("pk", 
"_year_date"));
     }
 
     @Test

Reply via email to