This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 04db40d973 [Fix] [Connector-V2] Postgres support for multiple primary
keys (#8526)
04db40d973 is described below
commit 04db40d97372ffd5a572e0a8a54dfd33189c8529
Author: shirukai <[email protected]>
AuthorDate: Fri Jan 17 20:34:06 2025 +0800
[Fix] [Connector-V2] Postgres support for multiple primary keys (#8526)
---
.../psql/PostgresCreateTableSqlBuilder.java | 26 +++++++++++++++-------
.../psql/PostgresCreateTableSqlBuilderTest.java | 5 +++--
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
index 1fbfd7c095..1fb57f3e9f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
@@ -73,6 +73,11 @@ public class PostgresCreateTableSqlBuilder {
buildColumnSql(column),
fieldIde))
.collect(Collectors.toList());
+ // add primary key
+ if (createIndex && primaryKey != null) {
+ columnSqls.add("\t" + buildPrimaryKeySql());
+ }
+
if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) {
for (ConstraintKey constraintKey : constraintKeys) {
if (StringUtils.isBlank(constraintKey.getConstraintName())
@@ -134,14 +139,6 @@ public class PostgresCreateTableSqlBuilder {
if (!column.isNullable()) {
columnSql.append(" NOT NULL");
}
-
- // Add primary key directly after the column if it is a primary key
- if (createIndex
- && primaryKey != null
- && primaryKey.getColumnNames().contains(column.getName())) {
- columnSql.append(" PRIMARY KEY");
- }
-
return columnSql.toString();
}
@@ -163,6 +160,19 @@ public class PostgresCreateTableSqlBuilder {
return columnCommentSql.toString();
}
+ private String buildPrimaryKeySql() {
+ String constraintName = UUID.randomUUID().toString().replace("-", "");
+ String primaryKeyColumns =
+ primaryKey.getColumnNames().stream()
+ .map(
+ column ->
+ String.format(
+ "\"%s\"",
+
CatalogUtils.getFieldIde(column, fieldIde)))
+ .collect(Collectors.joining(","));
+ return "CONSTRAINT \"" + constraintName + "\" PRIMARY KEY (" +
primaryKeyColumns + ")";
+ }
+
private String buildUniqueKeySql(ConstraintKey constraintKey) {
String constraintName = UUID.randomUUID().toString().replace("-", "");
String indexColumns =
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
index 03b99b1ca0..cc820a4ed3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
@@ -52,9 +52,10 @@ class PostgresCreateTableSqlBuilderTest {
catalogTable.getTableId().toTablePath());
String pattern =
"CREATE TABLE \"test\" \\(\n"
- + "\"id\" int4 NOT NULL PRIMARY
KEY,\n"
+ + "\"id\" int4 NOT NULL,\n"
+ "\"name\" text NOT NULL,\n"
+ "\"age\" int4 NOT NULL,\n"
+ + "\tCONSTRAINT \"([a-zA-Z0-9]+)\"
PRIMARY KEY \\(\"id\",\"name\"\\),\n"
+ "\tCONSTRAINT \"([a-zA-Z0-9]+)\"
UNIQUE \\(\"name\"\\)\n"
+ "\\);";
Assertions.assertTrue(
@@ -142,7 +143,7 @@ class PostgresCreateTableSqlBuilderTest {
TableSchema tableSchema =
TableSchema.builder()
.columns(columns)
- .primaryKey(PrimaryKey.of("pk_id",
Lists.newArrayList("id")))
+ .primaryKey(PrimaryKey.of("pk_id_name",
Lists.newArrayList("id", "name")))
.constraintKey(
Lists.newArrayList(
ConstraintKey.of(