This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 93e5547d6 [FLINK-38520][postgres] Postgres YAML CDC support array with
null element (#4254)
93e5547d6 is described below
commit 93e5547d6b3ef6a0a0b1b9bbba2c324c060ce564
Author: Jia Fan <[email protected]>
AuthorDate: Mon Feb 2 10:38:10 2026 +0800
[FLINK-38520][postgres] Postgres YAML CDC support array with null element
(#4254)
---
.../postgres/source/PostgresFullTypesITCase.java | 63 +++++++++++++++++++++-
.../src/test/resources/ddl/column_type_test.sql | 16 ++++++
.../event/DebeziumEventDeserializationSchema.java | 25 ++++-----
3 files changed, 91 insertions(+), 13 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index 0dac3c153..22bc89ce0 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -910,10 +910,65 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
} catch (Exception e) {
Assertions.assertThat(getRootCause(e))
.hasMessage(
- "Unable convert multidimensional array value
'[null, null]' to a flat array.");
+ "Unable to convert multidimensional array value
'[null, null]' to a flat array.");
}
}
+ @Test
+ public void testArrayTypesWithNull() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("inventory.array_types_with_null")
+ .startupOptions(StartupOptions.initial())
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events,
1).f0;
+ RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(0)).after();
+
+ Object[] actualSnapshotObjects = recordFields(snapshotRecord,
ARRAY_TYPES_WITH_NULL);
+
+ Assertions.assertThat(actualSnapshotObjects[0]).isEqualTo(1); // id
column
+
+ // Test text array with null element: ARRAY['hello', NULL, 'world']
+ ArrayData actualTextArray = (ArrayData) actualSnapshotObjects[1];
+ Assertions.assertThat(actualTextArray.size()).isEqualTo(3);
+ Assertions.assertThat(actualTextArray.getString(0))
+ .isEqualTo(BinaryStringData.fromString("hello"));
+ Assertions.assertThat(actualTextArray.isNullAt(1)).isTrue();
+ Assertions.assertThat(actualTextArray.getString(2))
+ .isEqualTo(BinaryStringData.fromString("world"));
+
+ // Test integer array with null element: ARRAY[1, NULL, 3]
+ ArrayData actualIntArray = (ArrayData) actualSnapshotObjects[2];
+ Assertions.assertThat(actualIntArray.size()).isEqualTo(3);
+ Assertions.assertThat(actualIntArray.getInt(0)).isEqualTo(1);
+ Assertions.assertThat(actualIntArray.isNullAt(1)).isTrue();
+ Assertions.assertThat(actualIntArray.getInt(2)).isEqualTo(3);
+ }
+
public Throwable getRootCause(Throwable throwable) {
Throwable cause = throwable;
while (cause.getCause() != null && cause.getCause() != cause) {
@@ -1064,6 +1119,12 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.INT()));
+ private static final RowType ARRAY_TYPES_WITH_NULL =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.ARRAY(DataTypes.INT()));
+
private static final RowType ARRAY_TYPES_MATRIX =
RowType.of(DataTypes.INT(),
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())));
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
index 81db1f248..a78561ef5 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
@@ -173,6 +173,22 @@ VALUES
'{42}'
);
+CREATE TABLE array_types_with_null (
+ id SERIAL PRIMARY KEY,
+ text_a1 TEXT[],
+ int_a1 INTEGER[]
+);
+
+ALTER TABLE inventory.array_types_with_null
+ REPLICA IDENTITY FULL;
+
+INSERT INTO array_types_with_null (id, text_a1, int_a1)
+VALUES
+ (1,
+ ARRAY['hello', NULL, 'world'],
+ ARRAY[1, NULL, 3]
+ );
+
CREATE TABLE array_types_unsupported_matrix (
id SERIAL PRIMARY KEY,
matrix_a1 INTEGER[][]
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index c484709ff..a1eb76bcc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -504,6 +504,13 @@ public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEve
}
Schema elementSchema = schema.valueSchema();
+ // Multidimensional arrays are not supported
+ if (elementSchema.type() == Schema.Type.ARRAY) {
+ throw new IllegalArgumentException(
+ "Unable to convert multidimensional array value '"
+ + dbzObj
+ + "' to a flat array.");
+ }
DataType elementType = schemaDataTypeInference.infer(null,
elementSchema);
DeserializationRuntimeConverter elementConverter =
getOrCreateConverter(elementType);
@@ -513,13 +520,10 @@ public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEve
for (int i = 0; i < list.size(); i++) {
Object element = list.get(i);
- if (element != null && elementSchema.type() !=
Schema.Type.ARRAY) {
- array[i] = elementConverter.convert(element,
elementSchema);
+ if (element == null) {
+ array[i] = null;
} else {
- throw new IllegalArgumentException(
- "Unable convert multidimensional array value '"
- + dbzObj
- + "' to a flat array.");
+ array[i] = elementConverter.convert(element,
elementSchema);
}
}
@@ -529,13 +533,10 @@ public abstract class DebeziumEventDeserializationSchema
extends SourceRecordEve
Object[] convertedArray = new Object[inputArray.length];
for (int i = 0; i < inputArray.length; i++) {
- if (inputArray[i] != null && elementSchema.type() !=
Schema.Type.ARRAY) {
- convertedArray[i] =
elementConverter.convert(inputArray[i], elementSchema);
+ if (inputArray[i] == null) {
+ convertedArray[i] = null;
} else {
- throw new IllegalArgumentException(
- "Unable convert multidimensional array value '"
- + dbzObj
- + "' to a flat array.");
+ convertedArray[i] =
elementConverter.convert(inputArray[i], elementSchema);
}
}