This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7b926ed94 [FLINK-38514][postgres] Add support for UUID array type in
PostgreSQL CDC connector (#4255)
7b926ed94 is described below
commit 7b926ed943ab303df541e2526d1d4e469eced8dc
Author: Jia Fan <[email protected]>
AuthorDate: Fri Feb 6 17:21:54 2026 +0800
[FLINK-38514][postgres] Add support for UUID array type in PostgreSQL CDC
connector (#4255)
---
.../docs/connectors/flink-sources/postgres-cdc.md | 1 +
.../docs/connectors/flink-sources/postgres-cdc.md | 1 +
.../postgres/utils/PostgresTypeUtils.java | 1 +
.../postgres/source/PostgresFullTypesITCase.java | 9 +-
.../src/test/resources/ddl/column_type_test.sql | 8 +-
.../table/RowDataDebeziumDeserializeSchema.java | 2 +-
.../postgres/source/utils/PostgresTypeUtils.java | 2 +
.../PostgreSQLDeserializationConverterFactory.java | 74 +++++++++++++
.../postgres/table/PostgreSQLConnectorITCase.java | 116 +++++++++++++++++++++
.../src/test/resources/ddl/column_type_test.sql | 26 ++++-
10 files changed, 234 insertions(+), 6 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
index f0e0de525..a9a5b560b 100644
--- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
@@ -736,6 +736,7 @@ Data Type Mapping
CHARACTER(n)<br>
VARCHAR(n)<br>
CHARACTER VARYING(n)<br>
+ UUID<br>
TEXT</td>
<td>STRING</td>
</tr>
diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index 533dea16c..a0477e54c 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -740,6 +740,7 @@ Since the order of processing these records cannot be
guaranteed, the final valu
CHARACTER(n)<br>
VARCHAR(n)<br>
CHARACTER VARYING(n)<br>
+ UUID<br>
TEXT</td>
<td>STRING</td>
</tr>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
index 3a7a19be6..9362249c4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
@@ -159,6 +159,7 @@ public class PostgresTypeUtils {
case PgOid.DATERANGE_OID:
return DataTypes.STRING();
case PgOid.TEXT_ARRAY:
+ case PgOid.UUID_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PgOid.TIMESTAMP:
return handleTimestampWithTemporalMode(temporalPrecisionMode,
scale);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index 22bc89ce0..d397fb14f 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -873,6 +873,12 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
ArrayData actualIntArray = (ArrayData) actualSnapshotObjects[3];
Assertions.assertThat(actualIntArray.getInt(0)).isEqualTo(42);
+
+ ArrayData actualUuidArray = (ArrayData) actualSnapshotObjects[4];
+ Assertions.assertThat(actualUuidArray.getString(0))
+
.isEqualTo(BinaryStringData.fromString("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"));
+ Assertions.assertThat(actualUuidArray.getString(1))
+
.isEqualTo(BinaryStringData.fromString("b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a22"));
}
@Test
@@ -1117,7 +1123,8 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
DataTypes.INT(),
DataTypes.ARRAY(DataTypes.STRING()),
DataTypes.ARRAY(DataTypes.INT()),
- DataTypes.ARRAY(DataTypes.INT()));
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.STRING()));
private static final RowType ARRAY_TYPES_WITH_NULL =
RowType.of(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
index a78561ef5..336888a76 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql
@@ -159,18 +159,20 @@ CREATE TABLE array_types (
id SERIAL PRIMARY KEY,
text_a1 TEXT[],
int_a1 INTEGER[],
- int_s1 INTEGER[]
+ int_s1 INTEGER[],
+ uuid_a1 UUID[]
);
ALTER TABLE inventory.array_types
REPLICA IDENTITY FULL;
-INSERT INTO array_types (id,text_a1, int_a1, int_s1)
+INSERT INTO array_types (id,text_a1, int_a1, int_s1, uuid_a1)
VALUES
(1,
ARRAY['electronics', 'gadget', 'sale'],
'{85, 90, 78}',
- '{42}'
+ '{42}',
+ ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11',
'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a22']::UUID[]
);
CREATE TABLE array_types_with_null (
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index d4a4634c3..b133753da 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -255,7 +255,7 @@ public final class RowDataDebeziumDeserializeSchema
//
-------------------------------------------------------------------------------------
/** Creates a runtime converter which is null safe. */
- private static DeserializationRuntimeConverter createConverter(
+ public static DeserializationRuntimeConverter createConverter(
LogicalType type,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory
userDefinedConverterFactory) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java
index f8276806f..5d13f3ba7 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java
@@ -61,6 +61,7 @@ public class PostgresTypeUtils {
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
private static final String PG_UUID = "uuid";
+ private static final String PG_UUID_ARRAY = "_uuid";
/** Returns a corresponding Flink data type from a debezium {@link
Column}. */
public static DataType fromDbzColumn(Column column) {
@@ -140,6 +141,7 @@ public class PostgresTypeUtils {
case PG_UUID:
return DataTypes.STRING();
case PG_TEXT_ARRAY:
+ case PG_UUID_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PG_TIMESTAMP:
return DataTypes.TIMESTAMP(scale);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java
index 74d183997..397d50daf 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLDeserializationConverterFactory.java
@@ -19,7 +19,10 @@ package org.apache.flink.cdc.connectors.postgres.table;
import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverter;
import
org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverterFactory;
+import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,6 +35,7 @@ import org.apache.kafka.connect.data.Struct;
import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -52,6 +56,8 @@ public class PostgreSQLDeserializationConverterFactory {
switch (logicalType.getTypeRoot()) {
case VARCHAR:
return createStringConverter();
+ case ARRAY:
+ return createArrayConverter((ArrayType) logicalType,
serverTimeZone, this);
default:
// fallback to default converter
return Optional.empty();
@@ -96,4 +102,72 @@ public class PostgreSQLDeserializationConverterFactory {
}
});
}
+
+ private static Optional<DeserializationRuntimeConverter>
createArrayConverter(
+ ArrayType arrayType,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory
userDefinedConverterFactory) {
+ LogicalType elementType = arrayType.getElementType();
+ // Create element converter using the standard converter creation logic
+ DeserializationRuntimeConverter elementConverter =
+ RowDataDebeziumDeserializeSchema.createConverter(
+ elementType, serverTimeZone,
userDefinedConverterFactory);
+
+ return Optional.of(
+ new DeserializationRuntimeConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ if (dbzObj == null) {
+ return null;
+ }
+
+ Schema elementSchema = schema.valueSchema();
+ // Multidimensional arrays are not supported
+ if (elementSchema != null && elementSchema.type() ==
Schema.Type.ARRAY) {
+ throw new IllegalArgumentException(
+ "Unable to convert multidimensional array
value '"
+ + dbzObj
+ + "' to a flat array.");
+ }
+
+ if (dbzObj instanceof List) {
+ List<?> list = (List<?>) dbzObj;
+ Object[] array = new Object[list.size()];
+
+ for (int i = 0; i < list.size(); i++) {
+ Object element = list.get(i);
+ if (element == null) {
+ array[i] = null;
+ } else {
+ array[i] =
elementConverter.convert(element, elementSchema);
+ }
+ }
+
+ return new GenericArrayData(array);
+ } else if (dbzObj instanceof Object[]) {
+ Object[] inputArray = (Object[]) dbzObj;
+ Object[] convertedArray = new
Object[inputArray.length];
+
+ for (int i = 0; i < inputArray.length; i++) {
+ if (inputArray[i] == null) {
+ convertedArray[i] = null;
+ } else {
+ convertedArray[i] =
+
elementConverter.convert(inputArray[i], elementSchema);
+ }
+ }
+
+ return new GenericArrayData(convertedArray);
+ }
+
+ throw new IllegalArgumentException(
+ "Unable to convert to Array from unexpected
value '"
+ + dbzObj
+ + "' of type "
+ + dbzObj.getClass().getName());
+ }
+ });
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index 7f4329be0..b0af1815d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -968,6 +968,122 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
result.getJobClient().get().cancel().get();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testArrayTypes(boolean parallelismSnapshot) throws Throwable {
+ setup(parallelismSnapshot);
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE array_types ("
+ + " id INTEGER NOT NULL,"
+ + " text_a1 ARRAY<STRING>,"
+ + " int_a1 ARRAY<INT>,"
+ + " int_s1 ARRAY<INT>,"
+ + " uuid_a1 ARRAY<STRING>"
+ + ") WITH ("
+ + " 'connector' = 'postgres-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ + " 'slot.name' = '%s'"
+ + ")",
+ POSTGIS_CONTAINER.getHost(),
+ POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+ POSTGIS_CONTAINER.getUsername(),
+ POSTGIS_CONTAINER.getPassword(),
+ POSTGIS_CONTAINER.getDatabaseName(),
+ "inventory",
+ "array_types",
+ parallelismSnapshot,
+ getSlotName());
+
+ tEnv.executeSql(sourceDDL);
+
+ String sinkDDL =
+ "CREATE TABLE array_sink ("
+ + " id INTEGER NOT NULL,"
+ + " text_a1 ARRAY<STRING>,"
+ + " int_a1 ARRAY<INT>,"
+ + " int_s1 ARRAY<INT>,"
+ + " uuid_a1 ARRAY<STRING>,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")";
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult tableResult =
+ tEnv.executeSql("INSERT INTO array_sink SELECT * FROM
array_types");
+
+ // wait for snapshot to complete
+ waitForSinkSize("array_sink", 1);
+
+ // verify snapshot data
+ List<String> snapshotResults =
TestValuesTableFactory.getRawResultsAsStrings("array_sink");
+ Assertions.assertThat(snapshotResults).hasSize(1);
+
+ // verify snapshot contains expected array data patterns (insert
record)
+ String snapshotRow = snapshotResults.get(0);
+ Assertions.assertThat(snapshotRow).startsWith("+I(");
+ Assertions.assertThat(snapshotRow).contains("electronics");
+ Assertions.assertThat(snapshotRow).contains("gadget");
+ Assertions.assertThat(snapshotRow).contains("sale");
+ Assertions.assertThat(snapshotRow).contains("85");
+ Assertions.assertThat(snapshotRow).contains("90");
+ Assertions.assertThat(snapshotRow).contains("78");
+ Assertions.assertThat(snapshotRow).contains("42");
+ Assertions.assertThat(snapshotRow)
+ .containsIgnoringCase("227496ad-fde9-ccfb-1f04-892fc505afd5");
+ Assertions.assertThat(snapshotRow)
+ .containsIgnoringCase("9d33f9e2-dfc7-fdef-9478-bcc5dbf7a6d7");
+
+ // wait a bit to make sure the replication slot is ready
+ Thread.sleep(5000);
+
+ // Test incremental (WAL) path - UPDATE array data including uuid_a1
+ try (Connection connection = getJdbcConnection(POSTGIS_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "UPDATE inventory.array_types SET text_a1=ARRAY['updated',
'array'], "
+ + "int_a1='{100, 200}', "
+ +
"uuid_a1=ARRAY['aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee',
'ffffffff-1111-2222-3333-444444444444']::UUID[] "
+ + "WHERE id=1;");
+ }
+
+ // Wait for update event (-D and +I, total 3 records including initial
+I)
+ waitForSinkSize("array_sink", 3);
+
+ // verify incremental update data with raw changelog
+ List<String> incrementalResults =
+ TestValuesTableFactory.getRawResultsAsStrings("array_sink");
+ Assertions.assertThat(incrementalResults).hasSize(3);
+
+ // verify updated array data is present in results
+ String allResults = String.join(",", incrementalResults);
+ Assertions.assertThat(allResults).contains("-D(");
+ Assertions.assertThat(allResults).contains("updated");
+ Assertions.assertThat(allResults).contains("array");
+ Assertions.assertThat(allResults).contains("100");
+ Assertions.assertThat(allResults).contains("200");
+ Assertions.assertThat(allResults)
+ .containsIgnoringCase("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee");
+ Assertions.assertThat(allResults)
+ .containsIgnoringCase("ffffffff-1111-2222-3333-444444444444");
+ Assertions.assertThat(allResults).contains("42");
+
+ tableResult.getJobClient().get().cancel().get();
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws
Exception {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
index 2d3005b8a..505d634cc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
@@ -56,4 +56,28 @@ INSERT INTO inventory.full_types
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
'2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479
-36.7208)'::geometry,
- 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
\ No newline at end of file
+ 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
+
+--
----------------------------------------------------------------------------------------------------------------
+-- TABLE: array_types
+--
----------------------------------------------------------------------------------------------------------------
+-- Test table for array types including UUID[]
+CREATE TABLE array_types (
+ id SERIAL PRIMARY KEY,
+ text_a1 TEXT[],
+ int_a1 INTEGER[],
+ int_s1 INTEGER[],
+ uuid_a1 UUID[]
+);
+
+ALTER TABLE inventory.array_types
+ REPLICA IDENTITY FULL;
+
+INSERT INTO array_types (id, text_a1, int_a1, int_s1, uuid_a1)
+VALUES
+ (1,
+ ARRAY['electronics', 'gadget', 'sale'],
+ '{85, 90, 78}',
+ '{42}',
+ ARRAY['227496Ad-fdE9-CCFb-1F04-892fc505AFD5',
'9d33f9E2-DfC7-fDef-9478-BCc5dBf7a6d7']::UUID[]
+ );
\ No newline at end of file