This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c729cab7 [cdc] mysql cdc does not obtain whether the original field 
data type is allowed to be null (#2012)
5c729cab7 is described below

commit 5c729cab7294f108c7889c60c178f3e8f466abe1
Author: Kerwin <[email protected]>
AuthorDate: Fri Sep 22 18:20:40 2023 +0800

    [cdc] mysql cdc does not obtain whether the original field data type is 
allowed to be null (#2012)
---
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 13 +++++++----
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  |  7 +++---
 .../flink/action/cdc/mysql/schema/MySqlSchema.java | 27 +++++++++++++++++++++-
 .../cdc/mysql/MySqlCdcTypeMappingITCase.java       |  6 ++---
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  8 +++----
 5 files changed, 45 insertions(+), 16 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index d6a807652..fd6c23e0d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -214,19 +214,22 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             JsonNode column = columns.get(i);
             JsonNode length = column.get("length");
             JsonNode scale = column.get("scale");
-            DataType type =
+            DataType dataType =
                     MySqlTypeUtils.toDataType(
                             column.get("typeName").asText(),
                             length == null ? null : length.asInt(),
                             scale == null ? null : scale.asInt(),
                             typeMapping);
 
-            if (!typeMapping.containsMode(TO_NULLABLE)) {
-                type = type.copy(column.get("optional").asBoolean());
-            }
+            dataType =
+                    dataType.copy(
+                            typeMapping.containsMode(TO_NULLABLE)
+                                    || column.get("optional").asBoolean());
 
             String fieldName = column.get("name").asText();
-            result.add(new DataField(i, caseSensitive ? fieldName : 
fieldName.toLowerCase(), type));
+            result.add(
+                    new DataField(
+                            i, caseSensitive ? fieldName : 
fieldName.toLowerCase(), dataType));
         }
         return result;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 888bbaf73..aec731fb2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -68,9 +68,10 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode>
                             scale == null ? null : scale.asInt(),
                             typeMapping);
 
-            if (!typeMapping.containsMode(TO_NULLABLE)) {
-                dataType.copy(element.get("optional").asBoolean());
-            }
+            dataType =
+                    dataType.copy(
+                            typeMapping.containsMode(TO_NULLABLE)
+                                    || element.get("optional").asBoolean());
 
             // TODO : add table comment and column comment when we upgrade 
flink cdc to 2.4
             fields.put(element.get("name").asText(), dataType);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 310fe53ed..044fb4c3e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -24,6 +24,9 @@ import 
org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.Pair;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -33,9 +36,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+
 /** Utility class to load MySQL table schema with JDBC. */
 public class MySqlSchema {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSchema.class);
+
     private final LinkedHashMap<String, Pair<DataType, String>> fields;
     private final List<String> primaryKeys;
 
@@ -70,7 +77,12 @@ public class MySqlSchema {
                 DataType paimonType =
                         MySqlTypeUtils.toDataType(fieldType, precision, scale, 
typeMapping);
 
-                fields.put(fieldName, Pair.of(paimonType, fieldComment));
+                boolean isNullable =
+                        typeMapping.containsMode(TO_NULLABLE)
+                                || 
isNullableColumn(rs.getString("IS_NULLABLE"));
+                DataType updatePaimonType = paimonType.copy(isNullable);
+
+                fields.put(fieldName, Pair.of(updatePaimonType, fieldComment));
             }
         }
 
@@ -145,4 +157,17 @@ public class MySqlSchema {
                         .collect(Collectors.joining(","))
                 + "]";
     }
+
+    private static boolean isNullableColumn(final String value) {
+        if ("YES".equals(value)) {
+            return true;
+        }
+
+        if ("NO".equals(value)) {
+            return false;
+        }
+
+        LOG.error("Unrecognized nullable value: " + value);
+        return true;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
index 8f173dd8d..e1b77d293 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -133,8 +133,9 @@ public class MySqlCdcTypeMappingITCase extends 
MySqlActionITCaseBase {
                 IntStream.range(0, allTypeNums)
                         .mapToObj(i -> DataTypes.STRING())
                         .toArray(DataType[]::new);
-        types[0] = types[0].notNull();
-        types[1] = types[1].notNull();
+        types[0] = types[0].notNull(); // id
+        types[1] = types[1].notNull(); // pt
+        types[22] = types[22].notNull(); // _serial SERIAL
 
         RowType rowType =
                 RowType.of(
@@ -297,7 +298,6 @@ public class MySqlCdcTypeMappingITCase extends 
MySqlActionITCaseBase {
                                 + "NULL, "
                                 + "NULL"
                                 + "]");
-
         waitForResult(expected, getFileStoreTable(tableName), rowType, 
Arrays.asList("pt", "_id"));
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index d0e725c1e..2fe0a4eb1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -363,7 +363,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.BIGINT(), // _bigint
                             DataTypes.DECIMAL(20, 0), // _bigint_unsigned
                             DataTypes.DECIMAL(20, 0), // 
_bigint_unsigned_zerofill
-                            DataTypes.DECIMAL(20, 0), // _serial
+                            DataTypes.DECIMAL(20, 0).notNull(), // _serial
                             DataTypes.FLOAT(), // _float
                             DataTypes.FLOAT(), // _float_unsigned
                             DataTypes.FLOAT(), // _float_unsigned_zerofill
@@ -621,8 +621,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         anyCauseMatches(
                                 IllegalArgumentException.class,
                                 "Column v1 have different types when merging 
schemas.\n"
-                                        + "Current table 
'{paimon_sync_table.incompatible_field_2}' fields: [_id INT,v1 INT]\n"
-                                        + "To be merged table 
'paimon_sync_table.incompatible_field_1' fields: [_id INT,v1 TIMESTAMP(0)]"));
+                                        + "Current table 
'{paimon_sync_table.incompatible_field_2}' fields: [_id INT NOT NULL,v1 INT]\n"
+                                        + "To be merged table 
'paimon_sync_table.incompatible_field_1' fields: [_id INT NOT NULL,v1 
TIMESTAMP(0)]"));
     }
 
     @Test
@@ -761,7 +761,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.STRING(),
                             DataTypes.STRING(),
                             DataTypes.STRING(),
-                            DataTypes.INT()
+                            DataTypes.INT().notNull()
                         },
                         new String[] {
                             "pk",

Reply via email to