This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e8d982e25 [Feature][JDBC Sink] Add DM upsert support (#5073)
5e8d982e25 is described below
commit 5e8d982e25dd2a77f366baba0d5876d67e6fc2ad
Author: lihjChina <[email protected]>
AuthorDate: Thu Jul 20 23:13:52 2023 +0800
[Feature][JDBC Sink] Add DM upsert support (#5073)
---------
Co-authored-by: David Zollo <[email protected]>
---
.../jdbc/internal/dialect/dm/DmdbDialect.java | 58 ++++-
.../connectors/seatunnel/jdbc/JdbcDmUpsetIT.java | 258 +++++++++++++++++++++
.../jdbc_dm_source_and_dm_upset_sink.conf | 49 ++++
3 files changed, 364 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
index c3a929be29..00845cf11a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -21,7 +21,10 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
public class DmdbDialect implements JdbcDialect {
@@ -43,6 +46,59 @@ public class DmdbDialect implements JdbcDialect {
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
- return Optional.empty();
+ List<String> nonUniqueKeyFields =
+ Arrays.stream(fieldNames)
+ .filter(fieldName ->
!Arrays.asList(uniqueKeyFields).contains(fieldName))
+ .collect(Collectors.toList());
+ String valuesBinding =
+ Arrays.stream(fieldNames)
+ .map(fieldName -> ":" + fieldName + " " +
quoteIdentifier(fieldName))
+ .collect(Collectors.joining(", "));
+ String usingClause = String.format("SELECT %s", valuesBinding);
+ String onConditions =
+ Arrays.stream(uniqueKeyFields)
+ .map(
+ fieldName ->
+ String.format(
+ "TARGET.%s=SOURCE.%s",
+ quoteIdentifier(fieldName),
+ quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(" AND "));
+
+ String updateSetClause =
+ nonUniqueKeyFields.stream()
+ .map(
+ fieldName ->
+ String.format(
+ "TARGET.%s=SOURCE.%s",
+ quoteIdentifier(fieldName),
+ quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(", "));
+
+ String insertFields =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String insertValues =
+ Arrays.stream(fieldNames)
+ .map(fieldName -> "SOURCE." +
quoteIdentifier(fieldName))
+ .collect(Collectors.joining(", "));
+ String upsertSQL =
+ String.format(
+ " MERGE INTO %s TARGET"
+ + " USING (%s) SOURCE"
+ + " ON (%s) "
+ + " WHEN MATCHED THEN"
+ + " UPDATE SET %s"
+ + " WHEN NOT MATCHED THEN"
+ + " INSERT (%s) VALUES (%s)",
+ tableIdentifier(database, tableName),
+ usingClause,
+ onConditions,
+ updateSetClause,
+ insertFields,
+ insertValues);
+
+ return Optional.of(upsertSQL);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
new file mode 100644
index 0000000000..6533943154
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcDmUpsetIT extends AbstractJdbcIT {
+
+ private static final String DM_IMAGE = "laglangyue/dmdb8";
+ private static final String DM_CONTAINER_HOST = "e2e_dmdb_upset";
+
+ private static final String DM_DATABASE = "SYSDBA";
+ private static final String DM_SOURCE = "E2E_TABLE_SOURCE_UPSET";
+ private static final String DM_SINK = "E2E_TABLE_SINK_UPSET";
+ private static final String DM_USERNAME = "SYSDBA";
+ private static final String DM_PASSWORD = "SYSDBA";
+ private static final int DOCKET_PORT = 5236;
+ private static final int JDBC_PORT = 5336;
+ private static final String DM_URL = "jdbc:dm://" + HOST + ":%s";
+
+ private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_dm_source_and_dm_upset_sink.conf");
+ private static final String CREATE_SQL =
+ "create table if not exists %s"
+ + "(\n"
+ + " DM_BIT BIT,\n"
+ + " DM_INT INT,\n"
+ + " DM_INTEGER INTEGER,\n"
+ + " DM_TINYINT TINYINT,\n"
+ + "\n"
+ + " DM_BYTE BYTE,\n"
+ + " DM_SMALLINT SMALLINT,\n"
+ + " DM_BIGINT BIGINT,\n"
+ + "\n"
+ + " DM_NUMBER NUMBER,\n"
+ + " DM_DECIMAL DECIMAL,\n"
+ + " DM_FLOAT FLOAT,\n"
+ + " DM_DOUBLE_PRECISION DOUBLE PRECISION,\n"
+ + " DM_DOUBLE DOUBLE,\n"
+ + "\n"
+ + " DM_CHAR CHAR,\n"
+ + " DM_VARCHAR VARCHAR,\n"
+ + " DM_VARCHAR2 VARCHAR2,\n"
+ + " DM_TEXT TEXT,\n"
+ + " DM_LONG LONG,\n"
+ + "\n"
+ + " DM_TIMESTAMP TIMESTAMP,\n"
+ + " DM_DATETIME DATETIME,\n"
+ + " DM_DATE DATE\n"
+ + ")";
+ private static final String CREATE_SINKTABLE_SQL =
+ "create table if not exists %s"
+ + "(\n"
+ + " DM_BIT BIT,\n"
+ + " DM_INT INT,\n"
+ + " DM_INTEGER INTEGER,\n"
+ + " DM_TINYINT TINYINT,\n"
+ + "\n"
+ + " DM_BYTE BYTE,\n"
+ + " DM_SMALLINT SMALLINT,\n"
+ + " DM_BIGINT BIGINT,\n"
+ + "\n"
+ + " DM_NUMBER NUMBER,\n"
+ + " DM_DECIMAL DECIMAL,\n"
+ + " DM_FLOAT FLOAT,\n"
+ + " DM_DOUBLE_PRECISION DOUBLE PRECISION,\n"
+ + " DM_DOUBLE DOUBLE,\n"
+ + "\n"
+ + " DM_CHAR CHAR,\n"
+ + " DM_VARCHAR VARCHAR,\n"
+ + " DM_VARCHAR2 VARCHAR2,\n"
+ + " DM_TEXT TEXT,\n"
+ + " DM_LONG LONG,\n"
+ + "\n"
+ + " DM_TIMESTAMP TIMESTAMP,\n"
+ + " DM_DATETIME DATETIME,\n"
+ + " DM_DATE DATE,\n"
+ + " CONSTRAINT DMPKID PRIMARY KEY (DM_BIT) \n"
+ + ")";
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(DM_URL, JDBC_PORT);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(DM_DATABASE, DM_SOURCE, fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(DM_IMAGE)
+ .networkAliases(DM_CONTAINER_HOST)
+ .containerEnv(containerEnv)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(DOCKET_PORT)
+ .localPort(DOCKET_PORT)
+ .jdbcTemplate(DM_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(DM_USERNAME)
+ .password(DM_PASSWORD)
+ .database(DM_DATABASE)
+ .sourceTable(DM_SOURCE)
+ .sinkTable(DM_SINK)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
+ }
+
+ @Override
+ void compareResult() {}
+
+ @Override
+ protected void createNeededTables() {
+ try (Statement statement = connection.createStatement()) {
+ String createTemplate = jdbcCase.getCreateSql();
+
+ String createSource =
+ String.format(
+ createTemplate,
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
jdbcCase.getSourceTable()));
+ String createSink =
+ String.format(
+ CREATE_SINKTABLE_SQL,
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
jdbcCase.getSinkTable()));
+
+ statement.execute(createSource);
+ statement.execute(createSink);
+ connection.commit();
+ } catch (Exception exception) {
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+ }
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar";
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames =
+ new String[] {
+ "DM_BIT",
+ "DM_INT",
+ "DM_INTEGER",
+ "DM_TINYINT",
+ "DM_BYTE",
+ "DM_SMALLINT",
+ "DM_BIGINT",
+ "DM_NUMBER",
+ "DM_DECIMAL",
+ "DM_FLOAT",
+ "DM_DOUBLE_PRECISION",
+ "DM_DOUBLE",
+ "DM_CHAR",
+ "DM_VARCHAR",
+ "DM_VARCHAR2",
+ "DM_TEXT",
+ "DM_LONG",
+ "DM_TIMESTAMP",
+ "DM_DATETIME",
+ "DM_DATE"
+ };
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i % 2 == 0 ? (byte) 1 : (byte) 0,
+ i,
+ i,
+ Short.valueOf("1"),
+ Byte.valueOf("1"),
+ i,
+ Long.parseLong("1"),
+ BigDecimal.valueOf(i, 18),
+ BigDecimal.valueOf(i, 18),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ Double.parseDouble("1.1"),
+ 'f',
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("{\"aa\":\"bb_%s\"}", i),
+ Timestamp.valueOf(LocalDateTime.now()),
+ new Timestamp(System.currentTimeMillis()),
+ Date.valueOf(LocalDate.now())
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ protected GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(DM_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(DM_CONTAINER_HOST)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DM_IMAGE)));
+ container.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", JDBC_PORT,
DOCKET_PORT)));
+
+ return container;
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return "\"" + field + "\"";
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_dm_source_and_dm_upset_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_dm_source_and_dm_upset_sink.conf
new file mode 100644
index 0000000000..96046c88f8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_dm_source_and_dm_upset_sink.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:dm://e2e_dmdb_upset:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ query = "select * from SYSDBA.E2E_TABLE_SOURCE_UPSET"
+ }
+
+}
+
+sink {
+ Jdbc {
+ url = "jdbc:dm://e2e_dmdb_upset:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ database = "SYSDBA"
+ primary_keys = [DM_BIT]
+ table = "E2E_TABLE_SINK_UPSET"
+ generate_sink_sql = true
+ query = ""
+ }
+}
+