This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b4d05171b [Fix][Connector-Jdbc]Fixed Vertica data source cannot 
upsert data. (#9607)
7b4d05171b is described below

commit 7b4d05171bef2122f5619b2468560400d65b9f5f
Author: chestnufang <[email protected]>
AuthorDate: Wed Jul 23 10:04:48 2025 +0800

    [Fix][Connector-Jdbc]Fixed Vertica data source cannot upsert data. (#9607)
---
 .../jdbc/internal/JdbcOutputFormatBuilder.java     |   3 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  20 ++++
 .../internal/dialect/vertica/VerticaDialect.java   |  29 ++++-
 .../dialect/vertica/VerticaDialectTest.java        | 124 +++++++++++++++++++++
 4 files changed, 170 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 30b313a79a..ba161f1685 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -187,8 +187,7 @@ public class JdbcOutputFormatBuilder {
         }
         if (enableUpsert) {
             Optional<String> upsertSQL =
-                    dialect.getUpsertStatement(
-                            database, table, tableSchema.getFieldNames(), 
pkNames);
+                    dialect.getUpsertStatementByTableSchema(database, table, 
tableSchema, pkNames);
             if (upsertSQL.isPresent()) {
                 return createSimpleExecutor(
                         upsertSQL.get(),
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 6ec44d92f8..6460bdc875 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.converter.TypeConverter;
 import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
@@ -239,6 +240,25 @@ public interface JdbcDialect extends Serializable {
     Optional<String> getUpsertStatement(
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields);
 
+    /**
+     * Constructs the dialects upsert statement if supported; such as MySQL's 
{@code DUPLICATE KEY
+     * UPDATE}, or PostgreSQL's {@code ON CONFLICT... DO UPDATE SET..}.
+     *
+     * <p>If supported, the returned string will be used as a {@link 
java.sql.PreparedStatement}.
+     * Fields in the statement must be in the same order as the {@code columns 
in tableSchema}
+     * parameter.
+     *
+     * <p>If the dialect does not support native upsert statements, the writer 
will fallback to
+     * {@code SELECT ROW Exists} + {@code UPDATE}/{@code INSERT} which may 
have poor performance.
+     *
+     * @return the dialects {@code UPSERT} statement or {@link 
Optional#empty()}.
+     */
+    default Optional<String> getUpsertStatementByTableSchema(
+            String database, String tableName, TableSchema tableSchema, 
String[] uniqueKeyFields) {
+        return getUpsertStatement(
+                database, tableName, tableSchema.getFieldNames(), 
uniqueKeyFields);
+    }
+
     /**
      * Different dialects optimize their PreparedStatement
      *
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
index ac970d1564..1076a42984 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -53,16 +54,36 @@ public class VerticaDialect implements JdbcDialect {
     @Override
     public Optional<String> getUpsertStatement(
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> getUpsertStatementByTableSchema(
+            String database, String tableName, TableSchema tableSchema, 
String[] uniqueKeyFields) {
+        String[] fieldNames = tableSchema.getFieldNames();
         List<String> nonUniqueKeyFields =
                 Arrays.stream(fieldNames)
                         .filter(fieldName -> 
!Arrays.asList(uniqueKeyFields).contains(fieldName))
                         .collect(Collectors.toList());
+        // Vertica JDBC currently requires explicitly specifying the data type
         String valuesBinding =
-                Arrays.stream(fieldNames)
-                        .map(fieldName -> ":" + fieldName + " " + 
quoteIdentifier(fieldName))
+                tableSchema.getColumns().stream()
+                        .map(
+                                column -> {
+                                    String fieldName = column.getName();
+                                    String sourceType = column.getSourceType();
+                                    return "CAST("
+                                            + ":"
+                                            + fieldName
+                                            + " AS "
+                                            + sourceType
+                                            + ")"
+                                            + " AS "
+                                            + quoteIdentifier(fieldName);
+                                })
                         .collect(Collectors.joining(", "));
 
-        String usingClause = String.format("SELECT %s FROM DUAL", 
valuesBinding);
+        String usingClause = String.format("SELECT %s ", valuesBinding);
         String onConditions =
                 Arrays.stream(uniqueKeyFields)
                         .map(
@@ -77,7 +98,7 @@ public class VerticaDialect implements JdbcDialect {
                         .map(
                                 fieldName ->
                                         String.format(
-                                                "TARGET.%s=SOURCE.%s",
+                                                "%s=SOURCE.%s",
                                                 quoteIdentifier(fieldName),
                                                 quoteIdentifier(fieldName)))
                         .collect(Collectors.joining(", "));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectTest.java
new file mode 100644
index 0000000000..1928480f35
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+public class VerticaDialectTest {
+
+    @Test
+    void testUpsertStatementByTableSchema() {
+        final String dataBaseName = "test_database";
+        final String tableName = "test_table";
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .column(
+                                PhysicalColumn.of(
+                                        "id",
+                                        BasicType.LONG_TYPE,
+                                        22L,
+                                        0,
+                                        false,
+                                        null,
+                                        "id",
+                                        "BIGINT",
+                                        new HashMap<>()))
+                        .column(
+                                PhysicalColumn.of(
+                                        "name",
+                                        BasicType.STRING_TYPE,
+                                        128L,
+                                        0,
+                                        false,
+                                        null,
+                                        "name",
+                                        "VARCHAR",
+                                        new HashMap<>()))
+                        .column(
+                                PhysicalColumn.of(
+                                        "age",
+                                        BasicType.INT_TYPE,
+                                        (Long) null,
+                                        0,
+                                        true,
+                                        null,
+                                        "age",
+                                        "INT",
+                                        new HashMap<>()))
+                        .column(
+                                PhysicalColumn.of(
+                                        "createTime",
+                                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                        3L,
+                                        0,
+                                        true,
+                                        null,
+                                        "createTime",
+                                        "TIME",
+                                        new HashMap<>()))
+                        .primaryKey(PrimaryKey.of("id", 
Lists.newArrayList("id")))
+                        .constraintKey(
+                                Collections.singletonList(
+                                        ConstraintKey.of(
+                                                
ConstraintKey.ConstraintType.INDEX_KEY,
+                                                "name",
+                                                Lists.newArrayList(
+                                                        
ConstraintKey.ConstraintKeyColumn.of(
+                                                                "name", 
null)))))
+                        .build();
+
+        VerticaDialect dialect = new VerticaDialect();
+        final String[] doUpdateKeyFields = {"id"};
+        final String[] doNothingKeyFields = {"id", "name", "age"};
+
+        String doUpdateSql =
+                dialect.getUpsertStatementByTableSchema(
+                                dataBaseName, tableName, tableSchema, 
doUpdateKeyFields)
+                        .orElseThrow(
+                                () ->
+                                        new AssertionError(
+                                                "Expected doUpdateSql String 
to be present"));
+        Assertions.assertEquals(
+                doUpdateSql,
+                " MERGE INTO test_database.\"test_table\" TARGET USING (SELECT 
CAST(:id AS BIGINT) AS \"id\", CAST(:name AS VARCHAR) AS \"name\", CAST(:age AS 
INT) AS \"age\", CAST(:createTime AS TIME) AS \"createTime\" ) SOURCE ON 
(TARGET.\"id\"=SOURCE.\"id\")  WHEN MATCHED THEN UPDATE SET 
\"name\"=SOURCE.\"name\", \"age\"=SOURCE.\"age\", 
\"createTime\"=SOURCE.\"createTime\" WHEN NOT MATCHED THEN INSERT (\"id\", 
\"name\", \"age\", \"createTime\") VALUES (SOURCE.\"id\", SOURCE.\"name\",  
[...]
+
+        String upsertCreateTimeSQL =
+                dialect.getUpsertStatementByTableSchema(
+                                dataBaseName, tableName, tableSchema, 
doNothingKeyFields)
+                        .orElseThrow(
+                                () ->
+                                        new AssertionError(
+                                                "Expected doNothingSql String 
to be present"));
+        Assertions.assertEquals(
+                upsertCreateTimeSQL,
+                " MERGE INTO test_database.\"test_table\" TARGET USING (SELECT 
CAST(:id AS BIGINT) AS \"id\", CAST(:name AS VARCHAR) AS \"name\", CAST(:age AS 
INT) AS \"age\", CAST(:createTime AS TIME) AS \"createTime\" ) SOURCE ON 
(TARGET.\"id\"=SOURCE.\"id\" AND TARGET.\"name\"=SOURCE.\"name\" AND 
TARGET.\"age\"=SOURCE.\"age\")  WHEN MATCHED THEN UPDATE SET 
\"createTime\"=SOURCE.\"createTime\" WHEN NOT MATCHED THEN INSERT (\"id\", 
\"name\", \"age\", \"createTime\") VALUES (SOURCE.\"id\ [...]
+    }
+}

Reply via email to