This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 139919334d [Feature][Connector-V2] Jdbc DB2 support upsert SQL  (#7879)
139919334d is described below

commit 139919334df43b8c320cb1139700a05124f55358
Author: Shashwat Tiwari <[email protected]>
AuthorDate: Mon Oct 21 14:35:29 2024 +0530

    [Feature][Connector-V2] Jdbc DB2 support upsert SQL  (#7879)
---
 .../jdbc/internal/dialect/db2/DB2Dialect.java      |  54 ++++++++-
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  |  22 ++++
 .../connectors/seatunnel/jdbc/JdbcCase.java        |   2 +
 .../connectors/seatunnel/jdbc/JdbcDb2IT.java       |   6 +-
 .../connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java | 133 +++++++++++++++++++++
 .../resources/jdbc_db2_source_and_sink_upsert.conf |  57 +++++++++
 6 files changed, 270 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
index 6150dd4330..5af57bf104 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
@@ -22,7 +22,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
+import java.util.Arrays;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class DB2Dialect implements JdbcDialect {
 
@@ -44,6 +46,56 @@ public class DB2Dialect implements JdbcDialect {
     @Override
     public Optional<String> getUpsertStatement(
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
-        return Optional.empty();
+        // Generate field list for USING and INSERT clauses
+        String fieldList = String.join(", ", fieldNames);
+
+        // Generate placeholder list for VALUES clause
+        String placeholderList =
+                Arrays.stream(fieldNames).map(field -> 
"?").collect(Collectors.joining(", "));
+
+        // Generate ON clause
+        String onClause =
+                Arrays.stream(uniqueKeyFields)
+                        .map(field -> "target." + field + " = source." + field)
+                        .collect(Collectors.joining(" AND "));
+
+        // Generate WHEN MATCHED clause
+        String whenMatchedClause =
+                Arrays.stream(fieldNames)
+                        .map(field -> "target." + field + " <> source." + 
field)
+                        .collect(Collectors.joining(" OR "));
+
+        // Generate UPDATE SET clause
+        String updateSetClause =
+                Arrays.stream(fieldNames)
+                        .map(field -> "target." + field + " = source." + field)
+                        .collect(Collectors.joining(", "));
+
+        // Generate WHEN NOT MATCHED clause
+        String insertClause =
+                "INSERT ("
+                        + fieldList
+                        + ") VALUES ("
+                        + Arrays.stream(fieldNames)
+                                .map(field -> "source." + field)
+                                .collect(Collectors.joining(", "))
+                        + ")";
+
+        // Combine all parts to form the final SQL statement
+        String mergeStatement =
+                String.format(
+                        "MERGE INTO %s.%s AS target USING (VALUES (%s)) AS 
source (%s) ON %s "
+                                + "WHEN MATCHED AND (%s) THEN UPDATE SET %s "
+                                + "WHEN NOT MATCHED THEN %s;",
+                        database,
+                        tableName,
+                        placeholderList,
+                        fieldList,
+                        onClause,
+                        whenMatchedClause,
+                        updateSetClause,
+                        insertClause);
+
+        return Optional.of(mergeStatement);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 24b916d404..9747396b61 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -209,6 +209,17 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
                                     jdbcCase.getSourceTable()));
             statement.execute(createSource);
 
+            if (jdbcCase.getAdditionalSqlOnSource() != null) {
+                String additionalSql =
+                        String.format(
+                                jdbcCase.getAdditionalSqlOnSource(),
+                                buildTableInfoWithSchema(
+                                        jdbcCase.getDatabase(),
+                                        jdbcCase.getSchema(),
+                                        jdbcCase.getSourceTable()));
+                statement.execute(additionalSql);
+            }
+
             if (!jdbcCase.isUseSaveModeCreateTable()) {
                 if (jdbcCase.getSinkCreateSql() != null) {
                     createTemplate = jdbcCase.getSinkCreateSql();
@@ -223,6 +234,17 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
                 statement.execute(createSink);
             }
 
+            if (jdbcCase.getAdditionalSqlOnSink() != null) {
+                String additionalSql =
+                        String.format(
+                                jdbcCase.getAdditionalSqlOnSink(),
+                                buildTableInfoWithSchema(
+                                        jdbcCase.getDatabase(),
+                                        jdbcCase.getSchema(),
+                                        jdbcCase.getSinkTable()));
+                statement.execute(additionalSql);
+            }
+
             connection.commit();
         } catch (Exception exception) {
             log.error(ExceptionUtils.getMessage(exception));
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index 3dd7b64b95..e6bbbd19a7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -48,6 +48,8 @@ public class JdbcCase {
     private String jdbcUrl;
     private String createSql;
     private String sinkCreateSql;
+    private String additionalSqlOnSource;
+    private String additionalSqlOnSink;
     private String insertSql;
     private List<String> configFile;
     private Pair<String[], List<SeaTunnelRow>> testData;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
index 22a29b3b67..a876d9bf7a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
@@ -44,9 +44,9 @@ public class JdbcDb2IT extends AbstractJdbcIT {
 
     private static final String DB2_CONTAINER_HOST = "db2-e2e";
 
-    private static final String DB2_DATABASE = "E2E";
-    private static final String DB2_SOURCE = "SOURCE";
-    private static final String DB2_SINK = "SINK";
+    protected static final String DB2_DATABASE = "E2E";
+    protected static final String DB2_SOURCE = "SOURCE";
+    protected static final String DB2_SINK = "SINK";
 
     private static final String DB2_URL = "jdbc:db2://" + HOST + ":%s/%s";
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java
new file mode 100644
index 0000000000..d6e0147368
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JdbcDb2UpsertIT extends JdbcDb2IT {
+
+    private static final String CREATE_SQL_SINK =
+            "create table %s\n"
+                    + "(\n"
+                    + "    C_BOOLEAN          BOOLEAN,\n"
+                    + "    C_SMALLINT         SMALLINT,\n"
+                    + "    C_INT              INTEGER NOT NULL PRIMARY KEY,\n"
+                    + "    C_INTEGER          INTEGER,\n"
+                    + "    C_BIGINT           BIGINT,\n"
+                    + "    C_DECIMAL          DECIMAL(5),\n"
+                    + "    C_DEC              DECIMAL(5),\n"
+                    + "    C_NUMERIC          DECIMAL(5),\n"
+                    + "    C_NUM              DECIMAL(5),\n"
+                    + "    C_REAL             REAL,\n"
+                    + "    C_FLOAT            DOUBLE,\n"
+                    + "    C_DOUBLE           DOUBLE,\n"
+                    + "    C_DOUBLE_PRECISION DOUBLE,\n"
+                    + "    C_CHAR             CHARACTER(1),\n"
+                    + "    C_VARCHAR          VARCHAR(255),\n"
+                    + "    C_BINARY           BINARY(1),\n"
+                    + "    C_VARBINARY        VARBINARY(2048),\n"
+                    + "    C_DATE             DATE,\n"
+                    + "    C_UPDATED_AT       TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP\n"
+                    + ");\n";
+
+    // create a trigger to update the timestamp when the row is updated.
+    // if no changes are made to the row, the timestamp should not be updated.
+    private static final String CREATE_TRIGGER_SQL =
+            "CREATE TRIGGER c_updated_at_trigger\n"
+                    + "    BEFORE UPDATE ON %s\n"
+                    + "    REFERENCING NEW AS new_row\n"
+                    + "    FOR EACH ROW\n"
+                    + "BEGIN ATOMIC\n"
+                    + "SET new_row.c_updated_at = CURRENT_TIMESTAMP;\n"
+                    + "END;";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_db2_source_and_sink_upsert.conf");
+
+    @Override
+    JdbcCase getJdbcCase() {
+        jdbcCase = super.getJdbcCase();
+        jdbcCase.setSinkCreateSql(CREATE_SQL_SINK);
+        jdbcCase.setConfigFile(CONFIG_FILE);
+        jdbcCase.setAdditionalSqlOnSink(CREATE_TRIGGER_SQL);
+        return jdbcCase;
+    }
+
+    @TestTemplate
+    public void testDb2UpsertE2e(TestContainer container)
+            throws IOException, InterruptedException, SQLException {
+        try {
+            // step 1: run the job to migrate data from source to sink.
+            Container.ExecResult execResult =
+                    
container.executeJob("/jdbc_db2_source_and_sink_upsert.conf");
+            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+            List<List<Object>> updatedAtTimestampsBeforeUpdate =
+                    query(
+                            String.format(
+                                    "SELECT C_UPDATED_AT  FROM %s",
+                                    buildTableInfoWithSchema(DB2_DATABASE, 
DB2_SINK)));
+            // step 2: run the job to update the data in the sink.
+            // expected: timestamps should not be updated as the data is not 
changed.
+            execResult = 
container.executeJob("/jdbc_db2_source_and_sink_upsert.conf");
+            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+            List<List<Object>> updatedAtTimestampsAfterUpdate =
+                    query(
+                            String.format(
+                                    "SELECT C_UPDATED_AT  FROM %s",
+                                    buildTableInfoWithSchema(DB2_DATABASE, 
DB2_SINK)));
+            Assertions.assertIterableEquals(
+                    updatedAtTimestampsBeforeUpdate, 
updatedAtTimestampsAfterUpdate);
+        } finally {
+            clearTable(DB2_DATABASE, DB2_SINK);
+        }
+    }
+
+    private List<List<Object>> query(String sql) {
+        try (Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
+            List<List<Object>> result = new ArrayList<>();
+            int columnCount = resultSet.getMetaData().getColumnCount();
+            while (resultSet.next()) {
+                ArrayList<Object> objects = new ArrayList<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    objects.add(resultSet.getString(i));
+                }
+                result.add(objects);
+                log.debug(String.format("Print query, sql: %s, data: %s", sql, 
objects));
+            }
+            connection.commit();
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf
new file mode 100644
index 0000000000..518a027d34
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Jdbc {
+    driver = com.ibm.db2.jcc.DB2Driver
+    url = "jdbc:db2://db2-e2e:50000/E2E"
+    user = "db2inst1"
+    password = "123456"
+    query = """
+    select * from "E2E".SOURCE;
+    """
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+sink {
+  Jdbc {
+    driver = com.ibm.db2.jcc.DB2Driver
+    url = "jdbc:db2://db2-e2e:50000/E2E"
+    user = "db2inst1"
+    password = "123456"
+    database = "E2E"
+    table = "SINK"
+    enable_upsert = true
+    # The primary keys of the table, which will be used to generate the upsert 
sql
+    generate_sink_sql = true
+    primary_keys = [
+      C_INT
+    ]
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}

Reply via email to