This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 40e28438 [FLINK-38850] Support PostgreSQL uuid type (#182)
40e28438 is described below

commit 40e284388d022db2cc6263300e518a1ed993d43f
Author: Chan hae OH <[email protected]>
AuthorDate: Fri Mar 6 00:30:37 2026 +0900

    [FLINK-38850] Support PostgreSQL uuid type (#182)
    
    [FLINK-38850] Support PostgreSQL uuid type
    
    Co-authored-by: och5351 <[email protected]>
---
 .../database/dialect/AbstractDialectConverter.java |  2 +-
 .../database/catalog/PostgresTypeMapper.java       |  3 ++
 .../database/catalog/PostgresCatalogITCase.java    | 20 ++++++++++
 .../database/catalog/PostgresCatalogTest.java      | 19 ++++++++-
 .../database/catalog/PostgresCatalogTestBase.java  | 46 ++++++++++++++++++++++
 .../database/dialect/PostgresDialectTest.java      |  2 +
 .../table/PostgresDynamicTableSourceITCase.java    |  8 ++++
 .../jdbc/postgres/testutils/PostgresMetadata.java  |  6 ++-
 8 files changed, 103 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
index e27dd852..0ba4dfd0 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
@@ -173,7 +173,7 @@ public abstract class AbstractDialectConverter implements 
JdbcDialectConverter {
                                 : TimestampData.fromTimestamp((Timestamp) val);
             case CHAR:
             case VARCHAR:
-                return val -> StringData.fromString((String) val);
+                return val -> StringData.fromString(val == null ? null : 
val.toString());
             case BINARY:
             case VARBINARY:
                 return val -> val;
diff --git 
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
index 5fc19ea3..04a21255 100644
--- 
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
+++ 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
@@ -84,6 +84,7 @@ public class PostgresTypeMapper implements 
JdbcCatalogTypeMapper {
     private static final String PG_CHARACTER_ARRAY = "_character";
     private static final String PG_CHARACTER_VARYING = "varchar";
     private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+    private static final String PG_UUID = "uuid";
 
     @Override
     public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, 
int colIndex)
@@ -176,6 +177,8 @@ public class PostgresTypeMapper implements 
JdbcCatalogTypeMapper {
                 return DataTypes.DATE();
             case PG_DATE_ARRAY:
                 return DataTypes.ARRAY(DataTypes.DATE());
+            case PG_UUID:
+                return DataTypes.VARCHAR(36);
             default:
                 return null;
         }
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
index ff7eabd7..c829f19d 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
@@ -193,4 +193,24 @@ class PostgresCatalogITCase extends 
PostgresCatalogTestBase {
                                 + "9223372036854775807, "
                                 + "9223372036854775807]]");
     }
+
+    @Test
+    void testUuidTypes() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", 
TABLE_UUID_TYPE))
+                                .execute()
+                                .collect());
+        assertThat(results).hasToString("[+I[1, 
123e4567-e89b-12d3-a456-426614174000]]");
+    }
+
+    @Test
+    void testNullUuidTypes() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", 
TABLE_UUID_TYPE2))
+                                .execute()
+                                .collect());
+        assertThat(results).hasToString("[+I[1, null]]");
+    }
 }
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
index 56da37c3..9ae3b9c4 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
@@ -73,7 +73,9 @@ class PostgresCatalogTest extends PostgresCatalogTestBase {
                                 "public.serial_table",
                                 "public.t1",
                                 "public.t4",
-                                "public.t5"));
+                                "public.t5",
+                                "public.uuid_table",
+                                "public.uuid_table2"));
 
         actual = catalog.listTables(TEST_DB);
 
@@ -186,4 +188,19 @@ class PostgresCatalogTest extends PostgresCatalogTestBase {
 
         
assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema);
     }
+
+    @Test
+    void testUuidDataTypes() throws TableNotExistException {
+        CatalogBaseTable table =
+                catalog.getTable(new 
ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE));
+        
assertThat(table.getUnresolvedSchema()).isEqualTo(getUuidTable().schema);
+    }
+
+    @Test
+    void testNullUuidDataTypes() throws TableNotExistException {
+        CatalogBaseTable table =
+                catalog.getTable(
+                        new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, 
TABLE_UUID_TYPE2));
+        
assertThat(table.getUnresolvedSchema()).isEqualTo(getNullUuidTable().schema);
+    }
 }
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
index f3d0d1c2..6477d9a1 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
@@ -55,6 +55,8 @@ class PostgresCatalogTestBase implements JdbcITCaseBase, 
PostgresTestBase {
     protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
     protected static final String TABLE_ARRAY_TYPE = "array_table";
     protected static final String TABLE_SERIAL_TYPE = "serial_table";
+    protected static final String TABLE_UUID_TYPE = "uuid_table";
+    protected static final String TABLE_UUID_TYPE2 = "uuid_table2";
 
     protected static String baseUrl;
     protected static PostgresCatalog catalog;
@@ -108,6 +110,11 @@ class PostgresCatalogTestBase implements JdbcITCaseBase, 
PostgresTestBase {
         createTable(
                 PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
                 getSerialTable().pgSchemaSql);
+        createTable(
+                PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE), 
getUuidTable().pgSchemaSql);
+        createTable(
+                PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2),
+                getNullUuidTable().pgSchemaSql);
 
         executeSQL(
                 PostgresCatalog.DEFAULT_DATABASE,
@@ -126,6 +133,15 @@ class PostgresCatalogTestBase implements JdbcITCaseBase, 
PostgresTestBase {
                 PostgresCatalog.DEFAULT_DATABASE,
                 String.format(
                         "insert into %s values (%s);", TABLE_SERIAL_TYPE, 
getSerialTable().values));
+        executeSQL(
+                PostgresCatalog.DEFAULT_DATABASE,
+                String.format(
+                        "insert into %s values (%s);", TABLE_UUID_TYPE, 
getUuidTable().values));
+        executeSQL(
+                PostgresCatalog.DEFAULT_DATABASE,
+                String.format(
+                        "insert into %s values (%s);",
+                        TABLE_UUID_TYPE2, getNullUuidTable().values));
     }
 
     @AfterAll
@@ -162,6 +178,14 @@ class PostgresCatalogTestBase implements JdbcITCaseBase, 
PostgresTestBase {
                 PostgresCatalog.DEFAULT_DATABASE,
                 String.format(
                         "DROP TABLE %s ", 
PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE)));
+        executeSQL(
+                PostgresCatalog.DEFAULT_DATABASE,
+                String.format(
+                        "DROP TABLE %s ", 
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE)));
+        executeSQL(
+                PostgresCatalog.DEFAULT_DATABASE,
+                String.format(
+                        "DROP TABLE %s ", 
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2)));
     }
 
     public static void createTable(PostgresTablePath tablePath, String 
tableSchemaSql)
@@ -393,4 +417,26 @@ class PostgresCatalogTestBase implements JdbcITCaseBase, 
PostgresTestBase {
                         + "9223372036854775807,"
                         + "9223372036854775807");
     }
+
+    public static TestTable getUuidTable() {
+        String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
+        return new TestTable(
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("uid_col", DataTypes.VARCHAR(36))
+                        .build(),
+                "id INT, " + "uid_col UUID",
+                String.format("1, '%s'", uuid1));
+    }
+
+    public static TestTable getNullUuidTable() {
+
+        return new TestTable(
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("uid_col", DataTypes.VARCHAR(36))
+                        .build(),
+                "id INT, " + "uid_col UUID",
+                "1, NULL");
+    }
 }
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
index 0ad1fb55..dc9290de 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
@@ -34,6 +34,8 @@ class PostgresDialectTest extends JdbcDialectTest implements 
PostgresTestBase {
     @Override
     protected List<TestItem> testData() {
         return Arrays.asList(
+                createTestItem("STRING"),
+                createTestItem("ARRAY<STRING>"),
                 createTestItem("CHAR"),
                 createTestItem("VARCHAR"),
                 createTestItem("BOOLEAN"),
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
index 079260d9..fb208d08 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
@@ -44,6 +44,8 @@ class PostgresDynamicTableSourceITCase extends 
JdbcDynamicTableSourceITCase
         return tableRow(
                 "jdbDynamicTableSource",
                 field("id", DataTypes.BIGINT().notNull()),
+                // uuid test field
+                field("uid_col", dbType("uuid"), DataTypes.STRING().notNull()),
                 field("decimal_col", DataTypes.DECIMAL(10, 4)),
                 field("timestamp6_col", DataTypes.TIMESTAMP(6)),
                 // other fields
@@ -53,9 +55,14 @@ class PostgresDynamicTableSourceITCase extends 
JdbcDynamicTableSourceITCase
     }
 
     protected List<Row> getTestData() {
+
+        String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
+        String uuid2 = "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11";
+
         return Arrays.asList(
                 Row.of(
                         1L,
+                        uuid1,
                         BigDecimal.valueOf(100.1234),
                         LocalDateTime.parse("2020-01-01T15:35:00.123456"),
                         1.175E-37F,
@@ -63,6 +70,7 @@ class PostgresDynamicTableSourceITCase extends 
JdbcDynamicTableSourceITCase
                         LocalTime.parse("15:35")),
                 Row.of(
                         2L,
+                        uuid2,
                         BigDecimal.valueOf(101.1234),
                         LocalDateTime.parse("2020-01-01T15:36:01.123456"),
                         -1.175E-37F,
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
index 2d083e05..220df3b5 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
@@ -42,7 +42,11 @@ public class PostgresMetadata implements DatabaseMetadata {
     public PostgresMetadata(JdbcDatabaseContainer<?> container, boolean 
hasXaEnabled) {
         this.username = container.getUsername();
         this.password = container.getPassword();
-        this.url = container.getJdbcUrl();
+        String baseUrl = container.getJdbcUrl();
+        this.url =
+                baseUrl.contains("?")
+                        ? baseUrl + "&stringtype=unspecified"
+                        : baseUrl + "?stringtype=unspecified";
         this.driver = container.getDriverClassName();
         this.version = container.getDockerImageName();
         this.xaEnabled = hasXaEnabled;

Reply via email to