This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d0c1de8357 [Fix][Connector-jdbc] Fix postgresql sink trying to update
unique key (#9293) (#9298)
d0c1de8357 is described below
commit d0c1de83576b7f79608c2d99a58ffb5657df3ac0
Author: papadave <[email protected]>
AuthorDate: Fri May 16 09:46:24 2025 +0800
[Fix][Connector-jdbc] Fix postgresql sink trying to update unique key
(#9293) (#9298)
---
.../internal/dialect/psql/PostgresDialect.java | 12 ++++-
.../internal/dialect/psql/PostgresDialectTest.java | 57 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index b56930303d..77f419ad36 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -43,9 +43,11 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CHAR;
@@ -159,20 +161,26 @@ public class PostgresDialect implements JdbcDialect {
Arrays.stream(uniqueKeyFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
+ final Set<String> uniqueKeyFieldsSet = new
HashSet<>(Arrays.asList(uniqueKeyFields));
String updateClause =
Arrays.stream(fieldNames)
+ .filter(fieldName ->
!uniqueKeyFieldsSet.contains(fieldName))
.map(
fieldName ->
quoteIdentifier(fieldName)
+ "=EXCLUDED."
+ quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));
+ String conflictAction =
+ updateClause.isEmpty()
+ ? "DO NOTHING"
+ : String.format("DO UPDATE SET %s", updateClause);
String upsertSQL =
String.format(
- "%s ON CONFLICT (%s) DO UPDATE SET %s",
+ "%s ON CONFLICT (%s) %s",
getInsertIntoStatement(database, tableName,
fieldNames),
uniqueColumns,
- updateClause);
+ conflictAction);
return Optional.of(upsertSQL);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectTest.java
new file mode 100644
index 0000000000..6d2648bb58
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PostgresDialectTest {
+
+ @Test
+ void testUpsertStatement() {
+ PostgresDialect dialect = new PostgresDialect();
+ final String database = "seatunnel";
+ final String tableName = "role";
+ final String[] fieldNames = {
+ "id", "type", "role_name", "description", "create_time",
"update_time"
+ };
+ final String[] doUpdateKeyFields = {"id"};
+ final String[] doNothingKeyFields = {
+ "id", "type", "role_name", "description", "create_time",
"update_time"
+ };
+
+ String doUpdateSql =
+ dialect.getUpsertStatement(database, tableName, fieldNames,
doUpdateKeyFields)
+ .orElseThrow(
+ () ->
+ new AssertionError(
+ "Expected doUpdateSql String
to be present"));
+ Assertions.assertEquals(
+ doUpdateSql,
+ "INSERT INTO \"seatunnel\".\"role\" (\"id\", \"type\",
\"role_name\", \"description\", \"create_time\", \"update_time\") VALUES (:id,
:type, :role_name, :description, :create_time, :update_time) ON CONFLICT
(\"id\") DO UPDATE SET \"type\"=EXCLUDED.\"type\",
\"role_name\"=EXCLUDED.\"role_name\", \"description\"=EXCLUDED.\"description\",
\"create_time\"=EXCLUDED.\"create_time\",
\"update_time\"=EXCLUDED.\"update_time\"");
+ String doNothingSql =
+ dialect.getUpsertStatement(database, tableName, fieldNames,
doNothingKeyFields)
+ .orElseThrow(
+ () ->
+ new AssertionError(
+ "Expected doNothingSql String
to be present"));
+ Assertions.assertEquals(
+ doNothingSql,
+ "INSERT INTO \"seatunnel\".\"role\" (\"id\", \"type\",
\"role_name\", \"description\", \"create_time\", \"update_time\") VALUES (:id,
:type, :role_name, :description, :create_time, :update_time) ON CONFLICT
(\"id\", \"type\", \"role_name\", \"description\", \"create_time\",
\"update_time\") DO NOTHING");
+ }
+}