This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5c729cab7 [cdc] mysql cdc does not obtain whether the original field
data type is allowed to be null (#2012)
5c729cab7 is described below
commit 5c729cab7294f108c7889c60c178f3e8f466abe1
Author: Kerwin <[email protected]>
AuthorDate: Fri Sep 22 18:20:40 2023 +0800
[cdc] mysql cdc does not obtain whether the original field data type is
allowed to be null (#2012)
---
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 13 +++++++----
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 7 +++---
.../flink/action/cdc/mysql/schema/MySqlSchema.java | 27 +++++++++++++++++++++-
.../cdc/mysql/MySqlCdcTypeMappingITCase.java | 6 ++---
.../cdc/mysql/MySqlSyncTableActionITCase.java | 8 +++----
5 files changed, 45 insertions(+), 16 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index d6a807652..fd6c23e0d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -214,19 +214,22 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
JsonNode column = columns.get(i);
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
- DataType type =
+ DataType dataType =
MySqlTypeUtils.toDataType(
column.get("typeName").asText(),
length == null ? null : length.asInt(),
scale == null ? null : scale.asInt(),
typeMapping);
- if (!typeMapping.containsMode(TO_NULLABLE)) {
- type = type.copy(column.get("optional").asBoolean());
- }
+ dataType =
+ dataType.copy(
+ typeMapping.containsMode(TO_NULLABLE)
+ || column.get("optional").asBoolean());
String fieldName = column.get("name").asText();
- result.add(new DataField(i, caseSensitive ? fieldName :
fieldName.toLowerCase(), type));
+ result.add(
+ new DataField(
+ i, caseSensitive ? fieldName :
fieldName.toLowerCase(), dataType));
}
return result;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 888bbaf73..aec731fb2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -68,9 +68,10 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode>
scale == null ? null : scale.asInt(),
typeMapping);
- if (!typeMapping.containsMode(TO_NULLABLE)) {
- dataType.copy(element.get("optional").asBoolean());
- }
+ dataType =
+ dataType.copy(
+ typeMapping.containsMode(TO_NULLABLE)
+ || element.get("optional").asBoolean());
// TODO : add table comment and column comment when we upgrade
flink cdc to 2.4
fields.put(element.get("name").asText(), dataType);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 310fe53ed..044fb4c3e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -24,6 +24,9 @@ import
org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -33,9 +36,13 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+
/** Utility class to load MySQL table schema with JDBC. */
public class MySqlSchema {
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlSchema.class);
+
private final LinkedHashMap<String, Pair<DataType, String>> fields;
private final List<String> primaryKeys;
@@ -70,7 +77,12 @@ public class MySqlSchema {
DataType paimonType =
MySqlTypeUtils.toDataType(fieldType, precision, scale,
typeMapping);
- fields.put(fieldName, Pair.of(paimonType, fieldComment));
+ boolean isNullable =
+ typeMapping.containsMode(TO_NULLABLE)
+ ||
isNullableColumn(rs.getString("IS_NULLABLE"));
+ DataType updatePaimonType = paimonType.copy(isNullable);
+
+ fields.put(fieldName, Pair.of(updatePaimonType, fieldComment));
}
}
@@ -145,4 +157,17 @@ public class MySqlSchema {
.collect(Collectors.joining(","))
+ "]";
}
+
+ private static boolean isNullableColumn(final String value) {
+ if ("YES".equals(value)) {
+ return true;
+ }
+
+ if ("NO".equals(value)) {
+ return false;
+ }
+
+ LOG.error("Unrecognized nullable value: " + value);
+ return true;
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
index 8f173dd8d..e1b77d293 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -133,8 +133,9 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
IntStream.range(0, allTypeNums)
.mapToObj(i -> DataTypes.STRING())
.toArray(DataType[]::new);
- types[0] = types[0].notNull();
- types[1] = types[1].notNull();
+ types[0] = types[0].notNull(); // id
+ types[1] = types[1].notNull(); // pt
+ types[22] = types[22].notNull(); // _serial SERIAL
RowType rowType =
RowType.of(
@@ -297,7 +298,6 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
+ "NULL, "
+ "NULL"
+ "]");
-
waitForResult(expected, getFileStoreTable(tableName), rowType,
Arrays.asList("pt", "_id"));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index d0e725c1e..2fe0a4eb1 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -363,7 +363,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.BIGINT(), // _bigint
DataTypes.DECIMAL(20, 0), // _bigint_unsigned
DataTypes.DECIMAL(20, 0), //
_bigint_unsigned_zerofill
- DataTypes.DECIMAL(20, 0), // _serial
+ DataTypes.DECIMAL(20, 0).notNull(), // _serial
DataTypes.FLOAT(), // _float
DataTypes.FLOAT(), // _float_unsigned
DataTypes.FLOAT(), // _float_unsigned_zerofill
@@ -621,8 +621,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
anyCauseMatches(
IllegalArgumentException.class,
"Column v1 have different types when merging
schemas.\n"
- + "Current table
'{paimon_sync_table.incompatible_field_2}' fields: [_id INT,v1 INT]\n"
- + "To be merged table
'paimon_sync_table.incompatible_field_1' fields: [_id INT,v1 TIMESTAMP(0)]"));
+ + "Current table
'{paimon_sync_table.incompatible_field_2}' fields: [_id INT NOT NULL,v1 INT]\n"
+ + "To be merged table
'paimon_sync_table.incompatible_field_1' fields: [_id INT NOT NULL,v1
TIMESTAMP(0)]"));
}
@Test
@@ -761,7 +761,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
- DataTypes.INT()
+ DataTypes.INT().notNull()
},
new String[] {
"pk",