This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new f3f51b576 [FLINK-36999][cdc-source-connectors] When the source field
type is decimal and the value is null, a null pointer exception occurs during
deserialization
f3f51b576 is described below
commit f3f51b576b4c59bec89770c89945636940019947
Author: linjianchang <[email protected]>
AuthorDate: Thu May 15 20:49:40 2025 +0800
[FLINK-36999][cdc-source-connectors] When the source field type is decimal
and the value is null, a null pointer exception occurs during deserialization
Co-authored-by: linjc13 <[email protected]>
Co-authored-by: gongzhongqiang <[email protected]>
---
.../flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java | 4 ++++
.../flink/cdc/connectors/oracle/table/OracleConnectorITCase.java | 6 +++++-
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
index 61c55ef8e..3afa7128d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
@@ -188,6 +188,10 @@ public class DebeziumSchemaDataTypeInference implements
SchemaDataTypeInference,
protected DataType inferStruct(Object value, Schema schema) {
Struct struct = (Struct) value;
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+ if (struct == null) {
+ // set the default value
+ return DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION,
DecimalType.DEFAULT_SCALE);
+ }
SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical(struct);
BigDecimal bigDecimal =
decimal.getDecimalValue().orElse(BigDecimal.ZERO);
return DataTypes.DECIMAL(bigDecimal.precision(),
bigDecimal.scale());
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
index 08c0c9bac..6eaa38c97 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
@@ -638,6 +638,9 @@ class OracleConnectorITCase {
statement.execute(
"INSERT INTO debezium.test_numeric_table "
+ "VALUES (11000000001, 1, 99, 9999, 987654321,
20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965)");
+ statement.execute(
+ "INSERT INTO debezium.test_numeric_table "
+ + "VALUES (11000000002, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL)");
}
String sourceDDL =
@@ -709,7 +712,8 @@ class OracleConnectorITCase {
List<String> expected =
Arrays.asList(
"+I[11000000000, false, 98, 9998, 987654320,
20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955]",
- "+I[11000000001, true, 99, 9999, 987654321,
20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]");
+ "+I[11000000001, true, 99, 9999, 987654321,
20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]",
+ "+I[11000000002, null, null, null, null, null, null,
null, null, null]");
List<String> actual =
TestValuesTableFactory.getRawResultsAsStrings("test_numeric_sink");
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);