This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 139919334d [Feature][Connector-V2] Jdbc DB2 support upsert SQL (#7879)
139919334d is described below
commit 139919334df43b8c320cb1139700a05124f55358
Author: Shashwat Tiwari <[email protected]>
AuthorDate: Mon Oct 21 14:35:29 2024 +0530
[Feature][Connector-V2] Jdbc DB2 support upsert SQL (#7879)
---
.../jdbc/internal/dialect/db2/DB2Dialect.java | 54 ++++++++-
.../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 22 ++++
.../connectors/seatunnel/jdbc/JdbcCase.java | 2 +
.../connectors/seatunnel/jdbc/JdbcDb2IT.java | 6 +-
.../connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java | 133 +++++++++++++++++++++
.../resources/jdbc_db2_source_and_sink_upsert.conf | 57 +++++++++
6 files changed, 270 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
index 6150dd4330..5af57bf104 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
@@ -22,7 +22,9 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import java.util.Arrays;
import java.util.Optional;
+import java.util.stream.Collectors;
public class DB2Dialect implements JdbcDialect {
@@ -44,6 +46,56 @@ public class DB2Dialect implements JdbcDialect {
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
- return Optional.empty();
+ // Generate field list for USING and INSERT clauses
+ String fieldList = String.join(", ", fieldNames);
+
+ // Generate placeholder list for VALUES clause
+ String placeholderList =
+ Arrays.stream(fieldNames).map(field ->
"?").collect(Collectors.joining(", "));
+
+ // Generate ON clause
+ String onClause =
+ Arrays.stream(uniqueKeyFields)
+ .map(field -> "target." + field + " = source." + field)
+ .collect(Collectors.joining(" AND "));
+
+ // Generate WHEN MATCHED clause
+ String whenMatchedClause =
+ Arrays.stream(fieldNames)
+ .map(field -> "target." + field + " <> source." +
field)
+ .collect(Collectors.joining(" OR "));
+
+ // Generate UPDATE SET clause
+ String updateSetClause =
+ Arrays.stream(fieldNames)
+ .map(field -> "target." + field + " = source." + field)
+ .collect(Collectors.joining(", "));
+
+ // Generate WHEN NOT MATCHED clause
+ String insertClause =
+ "INSERT ("
+ + fieldList
+ + ") VALUES ("
+ + Arrays.stream(fieldNames)
+ .map(field -> "source." + field)
+ .collect(Collectors.joining(", "))
+ + ")";
+
+ // Combine all parts to form the final SQL statement
+ String mergeStatement =
+ String.format(
+ "MERGE INTO %s.%s AS target USING (VALUES (%s)) AS
source (%s) ON %s "
+ + "WHEN MATCHED AND (%s) THEN UPDATE SET %s "
+ + "WHEN NOT MATCHED THEN %s;",
+ database,
+ tableName,
+ placeholderList,
+ fieldList,
+ onClause,
+ whenMatchedClause,
+ updateSetClause,
+ insertClause);
+
+ return Optional.of(mergeStatement);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 24b916d404..9747396b61 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -209,6 +209,17 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
jdbcCase.getSourceTable()));
statement.execute(createSource);
+ if (jdbcCase.getAdditionalSqlOnSource() != null) {
+ String additionalSql =
+ String.format(
+ jdbcCase.getAdditionalSqlOnSource(),
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSourceTable()));
+ statement.execute(additionalSql);
+ }
+
if (!jdbcCase.isUseSaveModeCreateTable()) {
if (jdbcCase.getSinkCreateSql() != null) {
createTemplate = jdbcCase.getSinkCreateSql();
@@ -223,6 +234,17 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
statement.execute(createSink);
}
+ if (jdbcCase.getAdditionalSqlOnSink() != null) {
+ String additionalSql =
+ String.format(
+ jdbcCase.getAdditionalSqlOnSink(),
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSinkTable()));
+ statement.execute(additionalSql);
+ }
+
connection.commit();
} catch (Exception exception) {
log.error(ExceptionUtils.getMessage(exception));
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index 3dd7b64b95..e6bbbd19a7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -48,6 +48,8 @@ public class JdbcCase {
private String jdbcUrl;
private String createSql;
private String sinkCreateSql;
+ private String additionalSqlOnSource;
+ private String additionalSqlOnSink;
private String insertSql;
private List<String> configFile;
private Pair<String[], List<SeaTunnelRow>> testData;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
index 22a29b3b67..a876d9bf7a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
@@ -44,9 +44,9 @@ public class JdbcDb2IT extends AbstractJdbcIT {
private static final String DB2_CONTAINER_HOST = "db2-e2e";
- private static final String DB2_DATABASE = "E2E";
- private static final String DB2_SOURCE = "SOURCE";
- private static final String DB2_SINK = "SINK";
+ protected static final String DB2_DATABASE = "E2E";
+ protected static final String DB2_SOURCE = "SOURCE";
+ protected static final String DB2_SINK = "SINK";
private static final String DB2_URL = "jdbc:db2://" + HOST + ":%s/%s";
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java
new file mode 100644
index 0000000000..d6e0147368
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JdbcDb2UpsertIT extends JdbcDb2IT {
+
+ private static final String CREATE_SQL_SINK =
+ "create table %s\n"
+ + "(\n"
+ + " C_BOOLEAN BOOLEAN,\n"
+ + " C_SMALLINT SMALLINT,\n"
+ + " C_INT INTEGER NOT NULL PRIMARY KEY,\n"
+ + " C_INTEGER INTEGER,\n"
+ + " C_BIGINT BIGINT,\n"
+ + " C_DECIMAL DECIMAL(5),\n"
+ + " C_DEC DECIMAL(5),\n"
+ + " C_NUMERIC DECIMAL(5),\n"
+ + " C_NUM DECIMAL(5),\n"
+ + " C_REAL REAL,\n"
+ + " C_FLOAT DOUBLE,\n"
+ + " C_DOUBLE DOUBLE,\n"
+ + " C_DOUBLE_PRECISION DOUBLE,\n"
+ + " C_CHAR CHARACTER(1),\n"
+ + " C_VARCHAR VARCHAR(255),\n"
+ + " C_BINARY BINARY(1),\n"
+ + " C_VARBINARY VARBINARY(2048),\n"
+ + " C_DATE DATE,\n"
+ + " C_UPDATED_AT TIMESTAMP DEFAULT
CURRENT_TIMESTAMP\n"
+ + ");\n";
+
+ // create a trigger to update the timestamp when the row is updated.
+ // if no changes are made to the row, the timestamp should not be updated.
+ private static final String CREATE_TRIGGER_SQL =
+ "CREATE TRIGGER c_updated_at_trigger\n"
+ + " BEFORE UPDATE ON %s\n"
+ + " REFERENCING NEW AS new_row\n"
+ + " FOR EACH ROW\n"
+ + "BEGIN ATOMIC\n"
+ + "SET new_row.c_updated_at = CURRENT_TIMESTAMP;\n"
+ + "END;";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_db2_source_and_sink_upsert.conf");
+
+ @Override
+ JdbcCase getJdbcCase() {
+ jdbcCase = super.getJdbcCase();
+ jdbcCase.setSinkCreateSql(CREATE_SQL_SINK);
+ jdbcCase.setConfigFile(CONFIG_FILE);
+ jdbcCase.setAdditionalSqlOnSink(CREATE_TRIGGER_SQL);
+ return jdbcCase;
+ }
+
+ @TestTemplate
+ public void testDb2UpsertE2e(TestContainer container)
+ throws IOException, InterruptedException, SQLException {
+ try {
+ // step 1: run the job to migrate data from source to sink.
+ Container.ExecResult execResult =
+
container.executeJob("/jdbc_db2_source_and_sink_upsert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ List<List<Object>> updatedAtTimestampsBeforeUpdate =
+ query(
+ String.format(
+ "SELECT C_UPDATED_AT FROM %s",
+ buildTableInfoWithSchema(DB2_DATABASE,
DB2_SINK)));
+ // step 2: run the job to update the data in the sink.
+ // expected: timestamps should not be updated as the data is not
changed.
+ execResult =
container.executeJob("/jdbc_db2_source_and_sink_upsert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ List<List<Object>> updatedAtTimestampsAfterUpdate =
+ query(
+ String.format(
+ "SELECT C_UPDATED_AT FROM %s",
+ buildTableInfoWithSchema(DB2_DATABASE,
DB2_SINK)));
+ Assertions.assertIterableEquals(
+ updatedAtTimestampsBeforeUpdate,
updatedAtTimestampsAfterUpdate);
+ } finally {
+ clearTable(DB2_DATABASE, DB2_SINK);
+ }
+ }
+
+ private List<List<Object>> query(String sql) {
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ List<List<Object>> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList<Object> objects = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ objects.add(resultSet.getString(i));
+ }
+ result.add(objects);
+ log.debug(String.format("Print query, sql: %s, data: %s", sql,
objects));
+ }
+ connection.commit();
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf
new file mode 100644
index 0000000000..518a027d34
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Jdbc {
+ driver = com.ibm.db2.jcc.DB2Driver
+ url = "jdbc:db2://db2-e2e:50000/E2E"
+ user = "db2inst1"
+ password = "123456"
+ query = """
+ select * from "E2E".SOURCE;
+ """
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+sink {
+ Jdbc {
+ driver = com.ibm.db2.jcc.DB2Driver
+ url = "jdbc:db2://db2-e2e:50000/E2E"
+ user = "db2inst1"
+ password = "123456"
+ database = "E2E"
+ table = "SINK"
+ enable_upsert = true
+ # The primary keys of the table, which will be used to generate the upsert
sql
+ generate_sink_sql = true
+ primary_keys = [
+ C_INT
+ ]
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}