This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 26bb97607 [flink][bug] Fix bug that running MySQL actions on existing
tables throws schema unmatch exception (#961)
26bb97607 is described below
commit 26bb9760733533a2b772c64a5eb63c293dfb9d68
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 20 17:36:46 2023 +0800
[flink][bug] Fix bug that running MySQL actions on existing tables throws
schema unmatch exception (#961)
---
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 4 ++++
.../cdc/mysql/MySqlSyncTableActionITCase.java | 26 +++++++++++++++++-----
.../src/test/resources/mysql/setup.sql | 5 +++--
3 files changed, 27 insertions(+), 8 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 44f748dfe..5a867c44f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -172,6 +172,10 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
public static ConvertAction canConvert(DataType oldType, DataType newType)
{
+ if (oldType.equalsIgnoreNullable(newType)) {
+ return ConvertAction.CONVERT;
+ }
+
int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 63128e9a2..b2f845177 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -341,6 +341,14 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
@Test
@Timeout(30)
public void testAllTypes() throws Exception {
+ // the first round checks for table creation
+ // the second round checks for running the action on an existing table
+ for (int i = 0; i < 2; i++) {
+ testAllTypesImpl();
+ }
+ }
+
+ private void testAllTypesImpl() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "all_types_table");
@@ -356,17 +364,18 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
warehouse,
database,
tableName,
- Collections.emptyList(),
- Collections.emptyList(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "_id"),
Collections.emptyMap(),
Collections.emptyMap());
action.build(env);
- env.executeAsync();
+ JobClient jobClient = env.executeAsync();
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(), // _id
+ DataTypes.DECIMAL(2, 1).notNull(), // pt
DataTypes.BOOLEAN(), // _boolean
DataTypes.TINYINT(), // _tinyint
DataTypes.SMALLINT(), // _tinyint_unsigned
@@ -421,6 +430,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
},
new String[] {
"_id",
+ "pt",
"_boolean",
"_tinyint",
"_tinyint_unsigned",
@@ -477,7 +487,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
List<String> expected =
Arrays.asList(
"+I["
- + "1, true, 1, 2, 3, "
+ + "1, 1.1, "
+ + "true, 1, 2, 3, "
+ "1000, 2000, 3000, "
+ "100000, 200000, 300000, "
+ "1000000, 2000000, 3000000, "
@@ -499,7 +510,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "[118, 101, 114, 121, 32, 108, 111, 110,
103, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97]"
+ "]",
"+I["
- + "2, NULL, NULL, NULL, NULL, "
+ + "2, 2.2, "
+ + "NULL, NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
@@ -518,7 +530,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL"
+ "]");
- waitForResult(expected, table, rowType,
Collections.singletonList("_id"));
+ waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
+
+ jobClient.cancel().get();
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index f2e125c89..715263831 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -51,6 +51,7 @@ CREATE TABLE schema_evolution_multiple (
CREATE TABLE all_types_table (
_id INT,
+ pt DECIMAL(2, 1),
-- TINYINT
_boolean TINYINT(1),
_tinyint TINYINT,
@@ -124,7 +125,7 @@ CREATE TABLE all_types_table (
);
INSERT INTO all_types_table VALUES (
- 1,
+ 1, 1.1,
-- TINYINT
true, 1, 2, 3,
-- SMALLINT
@@ -162,7 +163,7 @@ INSERT INTO all_types_table VALUES (
-- BINARY
'bytes', 'more bytes', 'very long bytes test data'
), (
- 2,
+ 2, 2.2,
NULL, NULL, NULL, NULL,
NULL, NULL, NULL,
NULL, NULL, NULL,