This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d0c1de8357 [Fix][Connector-jdbc] Fix postgresql sink trying to update 
unique key (#9293) (#9298)
d0c1de8357 is described below

commit d0c1de83576b7f79608c2d99a58ffb5657df3ac0
Author: papadave <[email protected]>
AuthorDate: Fri May 16 09:46:24 2025 +0800

    [Fix][Connector-jdbc] Fix postgresql sink trying to update unique key 
(#9293) (#9298)
---
 .../internal/dialect/psql/PostgresDialect.java     | 12 ++++-
 .../internal/dialect/psql/PostgresDialectTest.java | 57 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index b56930303d..77f419ad36 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -43,9 +43,11 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CHAR;
@@ -159,20 +161,26 @@ public class PostgresDialect implements JdbcDialect {
                 Arrays.stream(uniqueKeyFields)
                         .map(this::quoteIdentifier)
                         .collect(Collectors.joining(", "));
+        final Set<String> uniqueKeyFieldsSet = new 
HashSet<>(Arrays.asList(uniqueKeyFields));
         String updateClause =
                 Arrays.stream(fieldNames)
+                        .filter(fieldName -> 
!uniqueKeyFieldsSet.contains(fieldName))
                         .map(
                                 fieldName ->
                                         quoteIdentifier(fieldName)
                                                 + "=EXCLUDED."
                                                 + quoteIdentifier(fieldName))
                         .collect(Collectors.joining(", "));
+        String conflictAction =
+                updateClause.isEmpty()
+                        ? "DO NOTHING"
+                        : String.format("DO UPDATE SET %s", updateClause);
         String upsertSQL =
                 String.format(
-                        "%s ON CONFLICT (%s) DO UPDATE SET %s",
+                        "%s ON CONFLICT (%s) %s",
                         getInsertIntoStatement(database, tableName, 
fieldNames),
                         uniqueColumns,
-                        updateClause);
+                        conflictAction);
         return Optional.of(upsertSQL);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectTest.java
new file mode 100644
index 0000000000..6d2648bb58
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PostgresDialectTest {
+
+    @Test
+    void testUpsertStatement() {
+        PostgresDialect dialect = new PostgresDialect();
+        final String database = "seatunnel";
+        final String tableName = "role";
+        final String[] fieldNames = {
+            "id", "type", "role_name", "description", "create_time", 
"update_time"
+        };
+        final String[] doUpdateKeyFields = {"id"};
+        final String[] doNothingKeyFields = {
+            "id", "type", "role_name", "description", "create_time", 
"update_time"
+        };
+
+        String doUpdateSql =
+                dialect.getUpsertStatement(database, tableName, fieldNames, 
doUpdateKeyFields)
+                        .orElseThrow(
+                                () ->
+                                        new AssertionError(
+                                                "Expected doUpdateSql String 
to be present"));
+        Assertions.assertEquals(
+                doUpdateSql,
+                "INSERT INTO \"seatunnel\".\"role\" (\"id\", \"type\", 
\"role_name\", \"description\", \"create_time\", \"update_time\") VALUES (:id, 
:type, :role_name, :description, :create_time, :update_time) ON CONFLICT 
(\"id\") DO UPDATE SET \"type\"=EXCLUDED.\"type\", 
\"role_name\"=EXCLUDED.\"role_name\", \"description\"=EXCLUDED.\"description\", 
\"create_time\"=EXCLUDED.\"create_time\", 
\"update_time\"=EXCLUDED.\"update_time\"");
+        String doNothingSql =
+                dialect.getUpsertStatement(database, tableName, fieldNames, 
doNothingKeyFields)
+                        .orElseThrow(
+                                () ->
+                                        new AssertionError(
+                                                "Expected doNothingSql String 
to be present"));
+        Assertions.assertEquals(
+                doNothingSql,
+                "INSERT INTO \"seatunnel\".\"role\" (\"id\", \"type\", 
\"role_name\", \"description\", \"create_time\", \"update_time\") VALUES (:id, 
:type, :role_name, :description, :create_time, :update_time) ON CONFLICT 
(\"id\", \"type\", \"role_name\", \"description\", \"create_time\", 
\"update_time\") DO NOTHING");
+    }
+}

Reply via email to