This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 40e28438 [FLINK-38850] Support PostgreSQL uuid type (#182)
40e28438 is described below
commit 40e284388d022db2cc6263300e518a1ed993d43f
Author: Chan hae OH <[email protected]>
AuthorDate: Fri Mar 6 00:30:37 2026 +0900
[FLINK-38850] Support PostgreSQL uuid type (#182)
[FLINK-38850] Support PostgreSQL uuid type
Co-authored-by: och5351 <[email protected]>
---
.../database/dialect/AbstractDialectConverter.java | 2 +-
.../database/catalog/PostgresTypeMapper.java | 3 ++
.../database/catalog/PostgresCatalogITCase.java | 20 ++++++++++
.../database/catalog/PostgresCatalogTest.java | 19 ++++++++-
.../database/catalog/PostgresCatalogTestBase.java | 46 ++++++++++++++++++++++
.../database/dialect/PostgresDialectTest.java | 2 +
.../table/PostgresDynamicTableSourceITCase.java | 8 ++++
.../jdbc/postgres/testutils/PostgresMetadata.java | 6 ++-
8 files changed, 103 insertions(+), 3 deletions(-)
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
index e27dd852..0ba4dfd0 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java
@@ -173,7 +173,7 @@ public abstract class AbstractDialectConverter implements
JdbcDialectConverter {
: TimestampData.fromTimestamp((Timestamp) val);
case CHAR:
case VARCHAR:
- return val -> StringData.fromString((String) val);
+ return val -> StringData.fromString(val == null ? null :
val.toString());
case BINARY:
case VARBINARY:
return val -> val;
diff --git
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
index 5fc19ea3..04a21255 100644
---
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
+++
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java
@@ -84,6 +84,7 @@ public class PostgresTypeMapper implements
JdbcCatalogTypeMapper {
private static final String PG_CHARACTER_ARRAY = "_character";
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+ private static final String PG_UUID = "uuid";
@Override
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata,
int colIndex)
@@ -176,6 +177,8 @@ public class PostgresTypeMapper implements
JdbcCatalogTypeMapper {
return DataTypes.DATE();
case PG_DATE_ARRAY:
return DataTypes.ARRAY(DataTypes.DATE());
+ case PG_UUID:
+ return DataTypes.VARCHAR(36);
default:
return null;
}
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
index ff7eabd7..c829f19d 100644
---
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
+++
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java
@@ -193,4 +193,24 @@ class PostgresCatalogITCase extends
PostgresCatalogTestBase {
+ "9223372036854775807, "
+ "9223372036854775807]]");
}
+
+ @Test
+ void testUuidTypes() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s",
TABLE_UUID_TYPE))
+ .execute()
+ .collect());
+ assertThat(results).hasToString("[+I[1,
123e4567-e89b-12d3-a456-426614174000]]");
+ }
+
+ @Test
+ void testNullUuidTypes() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s",
TABLE_UUID_TYPE2))
+ .execute()
+ .collect());
+ assertThat(results).hasToString("[+I[1, null]]");
+ }
}
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
index 56da37c3..9ae3b9c4 100644
---
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
+++
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java
@@ -73,7 +73,9 @@ class PostgresCatalogTest extends PostgresCatalogTestBase {
"public.serial_table",
"public.t1",
"public.t4",
- "public.t5"));
+ "public.t5",
+ "public.uuid_table",
+ "public.uuid_table2"));
actual = catalog.listTables(TEST_DB);
@@ -186,4 +188,19 @@ class PostgresCatalogTest extends PostgresCatalogTestBase {
assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema);
}
+
+ @Test
+ void testUuidDataTypes() throws TableNotExistException {
+ CatalogBaseTable table =
+ catalog.getTable(new
ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE));
+
assertThat(table.getUnresolvedSchema()).isEqualTo(getUuidTable().schema);
+ }
+
+ @Test
+ void testNullUuidDataTypes() throws TableNotExistException {
+ CatalogBaseTable table =
+ catalog.getTable(
+ new ObjectPath(PostgresCatalog.DEFAULT_DATABASE,
TABLE_UUID_TYPE2));
+
assertThat(table.getUnresolvedSchema()).isEqualTo(getNullUuidTable().schema);
+ }
}
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
index f3d0d1c2..6477d9a1 100644
---
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
+++
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
@@ -55,6 +55,8 @@ class PostgresCatalogTestBase implements JdbcITCaseBase,
PostgresTestBase {
protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
protected static final String TABLE_ARRAY_TYPE = "array_table";
protected static final String TABLE_SERIAL_TYPE = "serial_table";
+ protected static final String TABLE_UUID_TYPE = "uuid_table";
+ protected static final String TABLE_UUID_TYPE2 = "uuid_table2";
protected static String baseUrl;
protected static PostgresCatalog catalog;
@@ -108,6 +110,11 @@ class PostgresCatalogTestBase implements JdbcITCaseBase,
PostgresTestBase {
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
getSerialTable().pgSchemaSql);
+ createTable(
+ PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE),
getUuidTable().pgSchemaSql);
+ createTable(
+ PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2),
+ getNullUuidTable().pgSchemaSql);
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
@@ -126,6 +133,15 @@ class PostgresCatalogTestBase implements JdbcITCaseBase,
PostgresTestBase {
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into %s values (%s);", TABLE_SERIAL_TYPE,
getSerialTable().values));
+ executeSQL(
+ PostgresCatalog.DEFAULT_DATABASE,
+ String.format(
+ "insert into %s values (%s);", TABLE_UUID_TYPE,
getUuidTable().values));
+ executeSQL(
+ PostgresCatalog.DEFAULT_DATABASE,
+ String.format(
+ "insert into %s values (%s);",
+ TABLE_UUID_TYPE2, getNullUuidTable().values));
}
@AfterAll
@@ -162,6 +178,14 @@ class PostgresCatalogTestBase implements JdbcITCaseBase,
PostgresTestBase {
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ",
PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE)));
+ executeSQL(
+ PostgresCatalog.DEFAULT_DATABASE,
+ String.format(
+ "DROP TABLE %s ",
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE)));
+ executeSQL(
+ PostgresCatalog.DEFAULT_DATABASE,
+ String.format(
+ "DROP TABLE %s ",
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2)));
}
public static void createTable(PostgresTablePath tablePath, String
tableSchemaSql)
@@ -393,4 +417,26 @@ class PostgresCatalogTestBase implements JdbcITCaseBase,
PostgresTestBase {
+ "9223372036854775807,"
+ "9223372036854775807");
}
+
+ public static TestTable getUuidTable() {
+ String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
+ return new TestTable(
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("uid_col", DataTypes.VARCHAR(36))
+ .build(),
+ "id INT, " + "uid_col UUID",
+ String.format("1, '%s'", uuid1));
+ }
+
+ public static TestTable getNullUuidTable() {
+
+ return new TestTable(
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("uid_col", DataTypes.VARCHAR(36))
+ .build(),
+ "id INT, " + "uid_col UUID",
+ "1, NULL");
+ }
}
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
index 0ad1fb55..dc9290de 100644
---
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
+++
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java
@@ -34,6 +34,8 @@ class PostgresDialectTest extends JdbcDialectTest implements
PostgresTestBase {
@Override
protected List<TestItem> testData() {
return Arrays.asList(
+ createTestItem("STRING"),
+ createTestItem("ARRAY<STRING>"),
createTestItem("CHAR"),
createTestItem("VARCHAR"),
createTestItem("BOOLEAN"),
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
index 079260d9..fb208d08 100644
---
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
+++
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
@@ -44,6 +44,8 @@ class PostgresDynamicTableSourceITCase extends
JdbcDynamicTableSourceITCase
return tableRow(
"jdbDynamicTableSource",
field("id", DataTypes.BIGINT().notNull()),
+ // uuid test field
+ field("uid_col", dbType("uuid"), DataTypes.STRING().notNull()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", DataTypes.TIMESTAMP(6)),
// other fields
@@ -53,9 +55,14 @@ class PostgresDynamicTableSourceITCase extends
JdbcDynamicTableSourceITCase
}
protected List<Row> getTestData() {
+
+ String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
+ String uuid2 = "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11";
+
return Arrays.asList(
Row.of(
1L,
+ uuid1,
BigDecimal.valueOf(100.1234),
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
1.175E-37F,
@@ -63,6 +70,7 @@ class PostgresDynamicTableSourceITCase extends
JdbcDynamicTableSourceITCase
LocalTime.parse("15:35")),
Row.of(
2L,
+ uuid2,
BigDecimal.valueOf(101.1234),
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
-1.175E-37F,
diff --git
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
index 2d083e05..220df3b5 100644
---
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
+++
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
@@ -42,7 +42,11 @@ public class PostgresMetadata implements DatabaseMetadata {
public PostgresMetadata(JdbcDatabaseContainer<?> container, boolean
hasXaEnabled) {
this.username = container.getUsername();
this.password = container.getPassword();
- this.url = container.getJdbcUrl();
+ String baseUrl = container.getJdbcUrl();
+ this.url =
+ baseUrl.contains("?")
+ ? baseUrl + "&stringtype=unspecified"
+ : baseUrl + "?stringtype=unspecified";
this.driver = container.getDriverClassName();
this.version = container.getDockerImageName();
this.xaEnabled = hasXaEnabled;