This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 93e5547d6 [FLINK-38520][postgres] Postgres YAML CDC support array with 
null element (#4254)
93e5547d6 is described below

commit 93e5547d6b3ef6a0a0b1b9bbba2c324c060ce564
Author: Jia Fan <[email protected]>
AuthorDate: Mon Feb 2 10:38:10 2026 +0800

    [FLINK-38520][postgres] Postgres YAML CDC support array with null element 
(#4254)
---
 .../postgres/source/PostgresFullTypesITCase.java   | 63 +++++++++++++++++++++-
 .../src/test/resources/ddl/column_type_test.sql    | 16 ++++++
 .../event/DebeziumEventDeserializationSchema.java  | 25 ++++-----
 3 files changed, 91 insertions(+), 13 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index 0dac3c153..22bc89ce0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -910,10 +910,65 @@ public class PostgresFullTypesITCase extends 
PostgresTestBase {
         } catch (Exception e) {
             Assertions.assertThat(getRootCause(e))
                     .hasMessage(
-                            "Unable convert multidimensional array value 
'[null, null]' to a flat array.");
+                            "Unable to convert multidimensional array value 
'[null, null]' to a flat array.");
         }
     }
 
+    @Test
+    public void testArrayTypesWithNull() throws Exception {
+        initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGIS_CONTAINER.getHost())
+                                
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+                                .tableList("inventory.array_types_with_null")
+                                .startupOptions(StartupOptions.initial())
+                                .serverTimeZone("UTC");
+        configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 
1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
+
+        Object[] actualSnapshotObjects = recordFields(snapshotRecord, 
ARRAY_TYPES_WITH_NULL);
+
+        Assertions.assertThat(actualSnapshotObjects[0]).isEqualTo(1); // id 
column
+
+        // Test text array with null element: ARRAY['hello', NULL, 'world']
+        ArrayData actualTextArray = (ArrayData) actualSnapshotObjects[1];
+        Assertions.assertThat(actualTextArray.size()).isEqualTo(3);
+        Assertions.assertThat(actualTextArray.getString(0))
+                .isEqualTo(BinaryStringData.fromString("hello"));
+        Assertions.assertThat(actualTextArray.isNullAt(1)).isTrue();
+        Assertions.assertThat(actualTextArray.getString(2))
+                .isEqualTo(BinaryStringData.fromString("world"));
+
+        // Test integer array with null element: ARRAY[1, NULL, 3]
+        ArrayData actualIntArray = (ArrayData) actualSnapshotObjects[2];
+        Assertions.assertThat(actualIntArray.size()).isEqualTo(3);
+        Assertions.assertThat(actualIntArray.getInt(0)).isEqualTo(1);
+        Assertions.assertThat(actualIntArray.isNullAt(1)).isTrue();
+        Assertions.assertThat(actualIntArray.getInt(2)).isEqualTo(3);
+    }
+
     public Throwable getRootCause(Throwable throwable) {
         Throwable cause = throwable;
         while (cause.getCause() != null && cause.getCause() != cause) {
@@ -1064,6 +1119,12 @@ public class PostgresFullTypesITCase extends 
PostgresTestBase {
                     DataTypes.ARRAY(DataTypes.INT()),
                     DataTypes.ARRAY(DataTypes.INT()));
 
+    private static final RowType ARRAY_TYPES_WITH_NULL =
+            RowType.of(
+                    DataTypes.INT(),
+                    DataTypes.ARRAY(DataTypes.STRING()),
+                    DataTypes.ARRAY(DataTypes.INT()));
+
     private static final RowType ARRAY_TYPES_MATRIX =
             RowType.of(DataTypes.INT(), 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())));
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
index 81db1f248..a78561ef5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
@@ -173,6 +173,22 @@ VALUES
      '{42}'
     );
 
+CREATE TABLE array_types_with_null (
+                             id        SERIAL PRIMARY KEY,
+                             text_a1   TEXT[],
+                             int_a1    INTEGER[]
+);
+
+ALTER TABLE inventory.array_types_with_null
+    REPLICA IDENTITY FULL;
+
+INSERT INTO array_types_with_null (id, text_a1, int_a1)
+VALUES
+    (1,
+     ARRAY['hello', NULL, 'world'],
+     ARRAY[1, NULL, 3]
+    );
+
 CREATE TABLE array_types_unsupported_matrix (
                              id        SERIAL PRIMARY KEY,
                              matrix_a1 INTEGER[][]
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index c484709ff..a1eb76bcc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -504,6 +504,13 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
         }
 
         Schema elementSchema = schema.valueSchema();
+        // Multidimensional arrays are not supported
+        if (elementSchema.type() == Schema.Type.ARRAY) {
+            throw new IllegalArgumentException(
+                    "Unable to convert multidimensional array value '"
+                            + dbzObj
+                            + "' to a flat array.");
+        }
         DataType elementType = schemaDataTypeInference.infer(null, 
elementSchema);
         DeserializationRuntimeConverter elementConverter = 
getOrCreateConverter(elementType);
 
@@ -513,13 +520,10 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
 
             for (int i = 0; i < list.size(); i++) {
                 Object element = list.get(i);
-                if (element != null && elementSchema.type() != 
Schema.Type.ARRAY) {
-                    array[i] = elementConverter.convert(element, 
elementSchema);
+                if (element == null) {
+                    array[i] = null;
                 } else {
-                    throw new IllegalArgumentException(
-                            "Unable convert multidimensional array value '"
-                                    + dbzObj
-                                    + "' to a flat array.");
+                    array[i] = elementConverter.convert(element, 
elementSchema);
                 }
             }
 
@@ -529,13 +533,10 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
             Object[] convertedArray = new Object[inputArray.length];
 
             for (int i = 0; i < inputArray.length; i++) {
-                if (inputArray[i] != null && elementSchema.type() != 
Schema.Type.ARRAY) {
-                    convertedArray[i] = 
elementConverter.convert(inputArray[i], elementSchema);
+                if (inputArray[i] == null) {
+                    convertedArray[i] = null;
                 } else {
-                    throw new IllegalArgumentException(
-                            "Unable convert multidimensional array value '"
-                                    + dbzObj
-                                    + "' to a flat array.");
+                    convertedArray[i] = 
elementConverter.convert(inputArray[i], elementSchema);
                 }
             }
 

Reply via email to