This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new f3f51b576 [FLINK-36999][cdc-source-connectors] When the source field 
type is decimal and the value is null, a null pointer exception occurs during 
deserialization
f3f51b576 is described below

commit f3f51b576b4c59bec89770c89945636940019947
Author: linjianchang <[email protected]>
AuthorDate: Thu May 15 20:49:40 2025 +0800

    [FLINK-36999][cdc-source-connectors] When the source field type is decimal 
and the value is null, a null pointer exception occurs during deserialization
    
    Co-authored-by: linjc13 <[email protected]>
    Co-authored-by: gongzhongqiang <[email protected]>
---
 .../flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java   | 4 ++++
 .../flink/cdc/connectors/oracle/table/OracleConnectorITCase.java    | 6 +++++-
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
index 61c55ef8e..3afa7128d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java
@@ -188,6 +188,10 @@ public class DebeziumSchemaDataTypeInference implements 
SchemaDataTypeInference,
     protected DataType inferStruct(Object value, Schema schema) {
         Struct struct = (Struct) value;
         if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+            if (struct == null) {
+                // set the default value
+                return DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION, 
DecimalType.DEFAULT_SCALE);
+            }
             SpecialValueDecimal decimal = 
VariableScaleDecimal.toLogical(struct);
             BigDecimal bigDecimal = 
decimal.getDecimalValue().orElse(BigDecimal.ZERO);
             return DataTypes.DECIMAL(bigDecimal.precision(), 
bigDecimal.scale());
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
index 08c0c9bac..6eaa38c97 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java
@@ -638,6 +638,9 @@ class OracleConnectorITCase {
             statement.execute(
                     "INSERT INTO debezium.test_numeric_table "
                             + "VALUES (11000000001, 1, 99, 9999, 987654321, 
20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965)");
+            statement.execute(
+                    "INSERT INTO debezium.test_numeric_table "
+                            + "VALUES (11000000002, NULL, NULL, NULL, NULL, 
NULL, NULL, NULL, NULL, NULL)");
         }
 
         String sourceDDL =
@@ -709,7 +712,8 @@ class OracleConnectorITCase {
         List<String> expected =
                 Arrays.asList(
                         "+I[11000000000, false, 98, 9998, 987654320, 
20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955]",
-                        "+I[11000000001, true, 99, 9999, 987654321, 
20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]");
+                        "+I[11000000001, true, 99, 9999, 987654321, 
20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]",
+                        "+I[11000000002, null, null, null, null, null, null, 
null, null, null]");
 
         List<String> actual = 
TestValuesTableFactory.getRawResultsAsStrings("test_numeric_sink");
         
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);

Reply via email to