This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 26bb97607 [flink][bug] Fix bug that running MySQL actions on existing 
tables throws schema unmatch exception (#961)
26bb97607 is described below

commit 26bb9760733533a2b772c64a5eb63c293dfb9d68
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 20 17:36:46 2023 +0800

    [flink][bug] Fix bug that running MySQL actions on existing tables throws 
schema unmatch exception (#961)
---
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |  4 ++++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 26 +++++++++++++++++-----
 .../src/test/resources/mysql/setup.sql             |  5 +++--
 3 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 44f748dfe..5a867c44f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -172,6 +172,10 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
             Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
 
     public static ConvertAction canConvert(DataType oldType, DataType newType) 
{
+        if (oldType.equalsIgnoreNullable(newType)) {
+            return ConvertAction.CONVERT;
+        }
+
         int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
         int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
         if (oldIdx >= 0 && newIdx >= 0) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 63128e9a2..b2f845177 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -341,6 +341,14 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
     @Test
     @Timeout(30)
     public void testAllTypes() throws Exception {
+        // the first round checks for table creation
+        // the second round checks for running the action on an existing table
+        for (int i = 0; i < 2; i++) {
+            testAllTypesImpl();
+        }
+    }
+
+    private void testAllTypesImpl() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "all_types_table");
@@ -356,17 +364,18 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         warehouse,
                         database,
                         tableName,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "_id"),
                         Collections.emptyMap(),
                         Collections.emptyMap());
         action.build(env);
-        env.executeAsync();
+        JobClient jobClient = env.executeAsync();
 
         RowType rowType =
                 RowType.of(
                         new DataType[] {
                             DataTypes.INT().notNull(), // _id
+                            DataTypes.DECIMAL(2, 1).notNull(), // pt
                             DataTypes.BOOLEAN(), // _boolean
                             DataTypes.TINYINT(), // _tinyint
                             DataTypes.SMALLINT(), // _tinyint_unsigned
@@ -421,6 +430,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         },
                         new String[] {
                             "_id",
+                            "pt",
                             "_boolean",
                             "_tinyint",
                             "_tinyint_unsigned",
@@ -477,7 +487,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         List<String> expected =
                 Arrays.asList(
                         "+I["
-                                + "1, true, 1, 2, 3, "
+                                + "1, 1.1, "
+                                + "true, 1, 2, 3, "
                                 + "1000, 2000, 3000, "
                                 + "100000, 200000, 300000, "
                                 + "1000000, 2000000, 3000000, "
@@ -499,7 +510,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "[118, 101, 114, 121, 32, 108, 111, 110, 
103, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97]"
                                 + "]",
                         "+I["
-                                + "2, NULL, NULL, NULL, NULL, "
+                                + "2, 2.2, "
+                                + "NULL, NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
@@ -518,7 +530,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL"
                                 + "]");
-        waitForResult(expected, table, rowType, 
Collections.singletonList("_id"));
+        waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
+
+        jobClient.cancel().get();
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index f2e125c89..715263831 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -51,6 +51,7 @@ CREATE TABLE schema_evolution_multiple (
 
 CREATE TABLE all_types_table (
     _id INT,
+    pt DECIMAL(2, 1),
     -- TINYINT
     _boolean TINYINT(1),
     _tinyint TINYINT,
@@ -124,7 +125,7 @@ CREATE TABLE all_types_table (
 );
 
 INSERT INTO all_types_table VALUES (
-    1,
+    1, 1.1,
     -- TINYINT
     true, 1, 2, 3,
     -- SMALLINT
@@ -162,7 +163,7 @@ INSERT INTO all_types_table VALUES (
     -- BINARY
     'bytes', 'more bytes', 'very long bytes test data'
 ), (
-    2,
+    2, 2.2,
     NULL, NULL, NULL, NULL,
     NULL, NULL, NULL,
     NULL, NULL, NULL,

Reply via email to