This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 15636bdea [Improve][e2e] Unified e2e IT for DaMengDB (#2946)
15636bdea is described below
commit 15636bdea1580196118b942d2d3587e0262d2b8f
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Sep 29 16:36:24 2022 +0800
[Improve][e2e] Unified e2e IT for DaMengDB (#2946)
* [Improve][e2e] Remove the un-unified e2e IT of DaMengDB
* [Improve][e2e] Unified e2e IT for DaMengDB
---
pom.xml | 2 +-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 3 -
.../pom.xml | 17 +-
.../connectors/seatunnel}/jdbc/JdbcDmdbIT.java | 123 ++++++-------
.../jdbc/internal/xa/XaGroupOpsImplIT.java | 0
.../seatunnel/jdbc/util/JdbcCompareUtil.java | 100 +++++++++++
.../src/test/resources/init}/dm_init.conf | 6 +-
.../test/resources}/jdbc_dm_source_and_sink.conf | 12 +-
.../src/test/resources/log4j.properties | 0
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 2 +-
.../common/container/ContainerExtendedFactory.java | 4 +-
.../e2e/common/container/TestContainer.java | 2 +-
.../flink/AbstractTestFlinkContainer.java | 2 +-
.../spark/AbstractTestSparkContainer.java | 2 +-
.../seatunnel/e2e/common/util/ContainerUtil.java | 6 +-
.../connector-jdbc-flink-e2e/pom.xml | 17 +-
.../connector-jdbc-spark-e2e/pom.xml | 15 +-
.../seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java | 191 ---------------------
.../src/test/resources/jdbc/init_sql/dm_init.conf | 122 -------------
.../resources/jdbc/jdbc_dm_source_and_sink.conf | 104 -----------
20 files changed, 196 insertions(+), 534 deletions(-)
diff --git a/pom.xml b/pom.xml
index 2e3ffe51c..aeea31a22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,7 +181,7 @@
<docker-maven-plugin.version>0.38.0</docker-maven-plugin.version>
<p3c-pmd.version>1.3.0</p3c-pmd.version>
<maven-scm-provider-jgit.version>1.9.5</maven-scm-provider-jgit.version>
- <testcontainer.version>1.16.3</testcontainer.version>
+ <testcontainer.version>1.17.3</testcontainer.version>
<!-- Option args -->
<skipUT>false</skipUT>
<skipIT>true</skipIT>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index a9f7f8671..d61865a19 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -76,13 +76,10 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
- <version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
- <version>${dm-jdbc.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
similarity index 86%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
index f80fb081c..4baadbefd 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
@@ -23,11 +23,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-jdbc-it</artifactId>
-
- <properties>
- <testcontainers.version>1.17.3</testcontainers.version>
- </properties>
+ <artifactId>connector-jdbc-e2e</artifactId>
<dependencyManagement>
<dependencies>
@@ -50,18 +46,25 @@
<scope>test</scope>
</dependency>
+ <!-- jdbc containers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
- <version>${testcontainers.version}</version>
+ <version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
+ <!-- drivers -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.dameng</groupId>
+ <artifactId>DmJdbcDriver18</artifactId>
+ <scope>test</scope>
</dependency>
-
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
similarity index 62%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
index 39004a35b..47da09b8c 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
@@ -15,21 +15,27 @@
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.flink.v2.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
import static org.awaitility.Awaitility.given;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.util.JdbcCompareUtil;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -46,11 +52,11 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@Slf4j
-public class JdbcDmdbIT extends FlinkContainer {
+public class JdbcDmdbIT extends TestSuiteBase implements TestResource {
private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
- private static final String HOST = "flink_e2e_dmdb";
+ private static final String HOST = "e2e_dmdb";
private static final String URL = "jdbc:dm://%s:5236";
private static final String USERNAME = "SYSDBA";
private static final String PASSWORD = "SYSDBA";
@@ -58,50 +64,47 @@ public class JdbcDmdbIT extends FlinkContainer {
private static final String SOURCE_TABLE = "e2e_table_source";
private static final String SINK_TABLE = "e2e_table_sink";
private static final String DM_DRIVER_JAR =
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar";
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory = container -> {
+ Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + DM_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
private Connection jdbcConnection;
private GenericContainer<?> dbServer;
- private static final String THIRD_PARTY_PLUGINS_URL =
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.2.141/DmJdbcDriver18-8.1.2.141.jar";
- @BeforeEach
- public void startDmdbContainer() throws ClassNotFoundException {
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
dbServer = new GenericContainer<>(DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(HOST)
- .withLogConsumer(new Slf4jLogConsumer(log));
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
dbServer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", 5236, 5236)));
+ String.format("%s:%s", 5236, 5236)));
Startables.deepStart(Stream.of(dbServer)).join();
log.info("Dmdb container started");
// wait for Dmdb fully start
Class.forName(DRIVER_CLASS);
given().ignoreExceptions()
- .await()
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initializeJdbcConnection);
+ .await()
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initializeJdbcConnection);
initializeJdbcTable();
}
private void initializeJdbcConnection() throws SQLException {
jdbcConnection = DriverManager.getConnection(String.format(
- URL, dbServer.getHost()), USERNAME, PASSWORD);
- }
-
- @Override
- protected void executeExtraCommands(GenericContainer<?> container) throws
IOException, InterruptedException {
- Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + DM_DRIVER_JAR);
- Assertions.assertEquals(0, extraCommands.getExitCode());
+ URL, dbServer.getHost()), USERNAME, PASSWORD);
}
/**
* init the table for DM_SERVER, DDL and DML for source and sink
*/
private void initializeJdbcTable() {
- java.net.URL resource =
FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
- if (resource == null) {
- throw new IllegalArgumentException("can't find find file");
- }
- String file = resource.getFile();
- Config config = ConfigFactory.parseFile(new File(file));
+ File file = ContainerUtil.getResourcesFile("/init/dm_init.conf");
+ Config config = ConfigFactory.parseFile(file);
assert config.hasPath("dm_table_source") && config.hasPath("DML") &&
config.hasPath("dm_table_sink");
try (Statement statement = jdbcConnection.createStatement()) {
// source
@@ -117,18 +120,9 @@ public class JdbcDmdbIT extends FlinkContainer {
}
}
- private void assertHasData(String table) {
- try (Statement statement = jdbcConnection.createStatement()) {
- String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
- ResultSet source = statement.executeQuery(sql);
- Assertions.assertTrue(source.next());
- } catch (SQLException e) {
- throw new RuntimeException("test dm server image error", e);
- }
- }
-
- @AfterEach
- public void closeDmdbContainer() throws SQLException {
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
if (jdbcConnection != null) {
jdbcConnection.close();
}
@@ -137,29 +131,36 @@ public class JdbcDmdbIT extends FlinkContainer {
}
}
- @Test
- @DisplayName("JDBC-DM container can be pull")
- public void testDMDBImage() {
+ @TestTemplate
+ @DisplayName("JDBC-DM end to end test")
+ public void testJdbcDmdb(TestContainer container) throws IOException,
InterruptedException, SQLException {
assertHasData(SOURCE_TABLE);
- }
-
- @Test
- @DisplayName("flink JDBC-DM test")
- public void testJdbcDmdbSourceAndSink() throws IOException,
InterruptedException, SQLException {
- assertHasData(SOURCE_TABLE);
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_dm_source_and_sink.conf");
+ Container.ExecResult execResult =
container.executeJob("/jdbc_dm_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
assertHasData(SINK_TABLE);
- JdbcE2eUtil.compare(jdbcConnection, String.format("select * from %s.%s
limit 1", DATABASE, SOURCE_TABLE),
- String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE),
- "DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE,
DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER, "
- + "DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION,
DM_DOUBLE, DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2,"
- + " DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP,
DM_DATETIME, DM_TIME, DM_DATE, DM_BLOB, DM_BINARY, DM_VARBINARY,
DM_LONGVARBINARY");
+ JdbcCompareUtil.compare(jdbcConnection, String.format("select * from
%s.%s limit 1", DATABASE, SOURCE_TABLE),
+ String.format("select * from %s.%s limit 1", DATABASE,
SINK_TABLE),
+ "DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT,
DM_BYTE, DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER, "
+ + "DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT,
DM_DOUBLE_PRECISION, DM_DOUBLE, DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2,"
+ + " DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB,
DM_TIMESTAMP, DM_DATETIME, DM_DATE, DM_BLOB, DM_BINARY, DM_VARBINARY,
DM_LONGVARBINARY");
+ clearSinkTable();
}
- @Override
- protected void executeExtraCommands(GenericContainer<?> container) throws
IOException, InterruptedException {
- Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
- Assertions.assertEquals(0, extraCommands.getExitCode());
+ private void assertHasData(String table) {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
+ ResultSet source = statement.executeQuery(sql);
+ Assertions.assertTrue(source.next());
+ } catch (SQLException e) {
+ throw new RuntimeException("test dm server image error", e);
+ }
+ }
+
+ private void clearSinkTable() {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ statement.execute(String.format("TRUNCATE TABLE %s.%s", DATABASE,
SINK_TABLE));
+ } catch (SQLException e) {
+ throw new RuntimeException("test dm server image error", e);
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
similarity index 100%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/util/JdbcCompareUtil.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/util/JdbcCompareUtil.java
new file mode 100644
index 000000000..709976d41
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/util/JdbcCompareUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.util;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class JdbcCompareUtil {
+
+ public static void compare(Connection conn, String sourceSql, String
sinkSQL, String columns) throws SQLException, IOException {
+ String[] split = StringUtils.split(columns, ",");
+ List<String> columnList = Arrays.stream(split)
+ .map(o -> StringUtils.remove(o, "\n"))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ compare(conn, sourceSql, sinkSQL, columnList);
+ }
+
+ @SuppressWarnings("magicnumber")
+ public static void compare(Connection conn, String sourceSql, String
sinkSQL, List<String> columns) throws SQLException, IOException {
+ Assertions.assertTrue(conn.isValid(10));
+ try (Statement sourceState = conn.createStatement();
+ Statement sinkState = conn.createStatement()
+ ) {
+ ResultSet source = sourceState.executeQuery(sourceSql);
+ ResultSet sink = sinkState.executeQuery(sinkSQL);
+ compareResultSet(source, sink, columns);
+ }
+ }
+
+ private static void compareResultSet(ResultSet source, ResultSet sink,
List<String> columns) throws SQLException, IOException {
+ Assertions.assertNotNull(source, "source can't be null");
+ Assertions.assertNotNull(sink, "source can't be null");
+ ResultSetMetaData sourceMetaData = source.getMetaData();
+ ResultSetMetaData sinkMetaData = sink.getMetaData();
+ Assertions.assertEquals(sourceMetaData.getColumnCount(),
sinkMetaData.getColumnCount());
+ while (source.next()) {
+ if (sink.next()) {
+ // compare by name
+ if (CollectionUtils.isNotEmpty(columns)) {
+ for (String column : columns) {
+ int sourceCnt = source.findColumn(column);
+ int sinkCnt = sink.findColumn(column);
+ Assertions.assertTrue(compareColumn(source, sink,
sourceCnt, sinkCnt));
+ }
+ } else {
+ // compare all column
+ for (int i = 1; i <= sourceMetaData.getColumnCount(); i++)
{
+ Assertions.assertTrue(compareColumn(source, sink, i,
i));
+ }
+ }
+ continue;
+ }
+ Assertions.fail("the row of source != sink");
+ }
+ }
+
+ private static boolean compareColumn(ResultSet source, ResultSet sink, int
sourceCnt, int sinkCnt) throws SQLException, IOException {
+ Object sourceObject = source.getObject(sourceCnt);
+ Object sinkObject = sink.getObject(sinkCnt);
+ if (Objects.deepEquals(sourceObject, sinkObject)) {
+ return true;
+ }
+ InputStream sourceAsciiStream = source.getBinaryStream(sourceCnt);
+ InputStream sinkAsciiStream = sink.getBinaryStream(sinkCnt);
+ String sourceValue = IOUtils.toString(sourceAsciiStream,
StandardCharsets.UTF_8);
+ String sinkValue = IOUtils.toString(sinkAsciiStream,
StandardCharsets.UTF_8);
+ return StringUtils.equals(sourceValue, sinkValue);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/init/dm_init.conf
similarity index 94%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/init/dm_init.conf
index 056c252a1..299cc7c36 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/init/dm_init.conf
@@ -49,7 +49,6 @@ create table if not exists "SYSDBA".e2e_table_source
DM_TIMESTAMP TIMESTAMP,
DM_DATETIME DATETIME,
- DM_TIME TIME,
DM_DATE DATE,
DM_BLOB BLOB,
@@ -95,7 +94,6 @@ create table if not exists "SYSDBA".e2e_table_sink
DM_TIMESTAMP TIMESTAMP,
DM_DATETIME DATETIME,
- DM_TIME TIME,
DM_DATE DATE,
DM_BLOB BLOB,
@@ -112,11 +110,11 @@ INSERT INTO "SYSDBA".e2e_table_source (
DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT,
DM_BIGINT,
DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT,
DM_DOUBLE_PRECISION, DM_DOUBLE,
DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG,
DM_LONGVARCHAR, DM_CLOB,
-DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE,
+DM_TIMESTAMP, DM_DATETIME, DM_DATE,
DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
VALUES
(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1',
'a', 'a', 'a', 'a', 'a', 'a', 'a',
-'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00',
'2022-08-13',
+'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '2022-08-13',
null, null, null, null, null, null)
"""
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_dm_source_and_sink.conf
similarity index 78%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_dm_source_and_sink.conf
index ba2468ccf..a536be5e0 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_dm_source_and_sink.conf
@@ -16,16 +16,13 @@
#
env {
- # You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
Jdbc {
- url = "jdbc:dm://flink_e2e_dmdb:5236"
+ url = "jdbc:dm://e2e_dmdb:5236"
driver = "dm.jdbc.driver.DmDriver"
connection_check_timeout_sec = 1000
user = "SYSDBA"
@@ -36,12 +33,11 @@ source {
}
transform {
-
}
sink {
Jdbc {
- url = "jdbc:dm://flink_e2e_dmdb:5236"
+ url = "jdbc:dm://e2e_dmdb:5236"
driver = "dm.jdbc.driver.DmDriver"
connection_check_timeout_sec = 1000
user = "SYSDBA"
@@ -49,8 +45,8 @@ sink {
query = """
INSERT INTO SYSDBA.e2e_table_sink (DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER,
DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER,
DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE,
DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG,
- DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE,
DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
-VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, DM_DATETIME, DM_DATE, DM_BLOB,
DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
+VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/log4j.properties
similarity index 100%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/log4j.properties
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 8066bb049..fbc2b214a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -25,7 +25,7 @@
<packaging>pom</packaging>
<modules>
<module>connector-assert-e2e</module>
- <module>connector-jdbc-it</module>
+ <module>connector-jdbc-e2e</module>
<module>connector-redis-e2e</module>
</modules>
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
index 3f760a225..e7d635f36 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
@@ -19,8 +19,10 @@ package org.apache.seatunnel.e2e.common.container;
import org.testcontainers.containers.GenericContainer;
+import java.io.IOException;
+
@FunctionalInterface
public interface ContainerExtendedFactory {
- void extend(GenericContainer<?> engineMasterContainer);
+ void extend(GenericContainer<?> engineMasterContainer) throws IOException,
InterruptedException;
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index 3d17759bb..7189ffdb6 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -30,7 +30,7 @@ public interface TestContainer extends TestResource {
String identifier();
- void executeExtraCommands(ContainerExtendedFactory extendedFactory);
+ void executeExtraCommands(ContainerExtendedFactory extendedFactory) throws
IOException, InterruptedException;
Container.ExecResult executeJob(String confFile) throws IOException,
InterruptedException;
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 521b1fa2b..07629c4ac 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -108,7 +108,7 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
return Collections.emptyList();
}
- public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
{
+ public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
throws IOException, InterruptedException {
extendedFactory.extend(jobManager);
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index 27127f1d9..4f3118ab3 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -74,7 +74,7 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
"--deploy-mode client");
}
- public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
{
+ public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
throws IOException, InterruptedException {
extendedFactory.extend(master);
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 4113ef042..eccf6f740 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -60,7 +60,7 @@ public final class ContainerUtil {
String connectorPrefix,
String connectorType,
String seatunnelHome) {
- Config jobConfig = getConfig(getConfigFile(confFile));
+ Config jobConfig = getConfig(getResourcesFile(confFile));
Config connectorsMapping = getConfig(new File(PROJECT_ROOT_PATH +
File.separator + PLUGIN_MAPPING_FILE));
if (!connectorsMapping.hasPath(connectorType) ||
connectorsMapping.getConfig(connectorType).isEmpty()) {
return;
@@ -79,7 +79,7 @@ public final class ContainerUtil {
public static String copyConfigFileToContainer(GenericContainer<?>
container, String confFile) {
final String targetConfInContainer = Paths.get("/tmp",
confFile).toString();
-
container.copyFileToContainer(MountableFile.forHostPath(getConfigFile(confFile).getAbsolutePath()),
targetConfInContainer);
+
container.copyFileToContainer(MountableFile.forHostPath(getResourcesFile(confFile).getAbsolutePath()),
targetConfInContainer);
return targetConfInContainer;
}
@@ -158,7 +158,7 @@ public final class ContainerUtil {
return Paths.get(System.getProperty("user.dir"));
}
- private static File getConfigFile(String confFile) {
+ public static File getResourcesFile(String confFile) {
File file = new File(getCurrentModulePath() + "/src/test/resources" +
confFile);
if (file.exists()) {
return file;
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index 5c5abe1ee..d48da1f48 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -25,10 +25,6 @@
<artifactId>connector-jdbc-flink-e2e</artifactId>
- <properties>
- <testcontainers.version>1.17.3</testcontainers.version>
- </properties>
-
<dependencyManagement>
<dependencies>
<dependency>
@@ -71,21 +67,21 @@
<scope>test</scope>
</dependency>
- <!-- jdbc drivers -->
+ <!-- jdbc containers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
- <version>${testcontainers.version}</version>
+ <version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
- <version>${testcontainers.version}</version>
+ <version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
+ <!-- jdbc drivers -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
@@ -96,11 +92,6 @@
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.dameng</groupId>
- <artifactId>DmJdbcDriver18</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index a55f14f02..7d16b9182 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -25,10 +25,6 @@
<artifactId>connector-jdbc-spark-e2e</artifactId>
- <properties>
- <testcontainers.version>1.17.3</testcontainers.version>
- </properties>
-
<dependencyManagement>
<dependencies>
<dependency>
@@ -63,14 +59,14 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
- <version>${testcontainers.version}</version>
+ <version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
- <version>${testcontainers.version}</version>
+ <version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
@@ -78,18 +74,13 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
+ <scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.dameng</groupId>
- <artifactId>DmJdbcDriver18</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
deleted file mode 100644
index 68a5a0613..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.jdbc;
-
-import static org.awaitility.Awaitility.given;
-
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class JdbcDmdbIT extends SparkContainer {
-
- private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
- private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
- private static final String HOST = "spark_e2e_dmdb";
- private static final String URL = "jdbc:dm://%s:5236";
- private static final String USERNAME = "SYSDBA";
- private static final String PASSWORD = "SYSDBA";
- private static final String DATABASE = "SYSDBA";
- private static final String SOURCE_TABLE = "e2e_table_source";
- private static final String SINK_TABLE = "e2e_table_sink";
- public static final String DM_DRIVER_JAR =
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar";
- private GenericContainer<?> dbServer;
- private Connection jdbcConnection;
- private static final String THIRD_PARTY_PLUGINS_URL =
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.2.141/DmJdbcDriver18-8.1.2.141.jar";
-
- @BeforeEach
- public void beforeAllForDM() {
- try {
- dbServer = new GenericContainer<>(DM_DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(HOST)
- .withLogConsumer(new Slf4jLogConsumer(log));
- dbServer.setPortBindings(Lists.newArrayList("5236:5236"));
- Startables.deepStart(Stream.of(dbServer)).join();
- log.info("dmdb container started");
- Class.forName(DRIVER_CLASS);
- given().ignoreExceptions()
- .await()
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initializeJdbcConnection);
- initializeJdbcTable();
- } catch (Exception ex) {
- log.error("dm container init failed", ex);
- throw new RuntimeException(ex);
- }
- }
-
- @AfterEach
- public void closeDmdbContainer() throws SQLException {
- if (jdbcConnection != null) {
- jdbcConnection.close();
- }
- if (dbServer != null) {
- dbServer.close();
- }
- }
-
- @Override
- protected void executeExtraCommands(GenericContainer<?> container) throws
IOException, InterruptedException {
- Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib "
- + "&& cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " +
DM_DRIVER_JAR);
- Assertions.assertEquals(0, extraCommands.getExitCode());
- }
-
- private void initializeJdbcConnection() throws SQLException {
- jdbcConnection = DriverManager.getConnection(String.format(
- URL, dbServer.getHost()), USERNAME, PASSWORD);
- }
-
- /**
- * init the table for DM_SERVER, DDL and DML for source and sink
- */
- private void initializeJdbcTable() {
- URL resource =
JdbcDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf");
- if (resource == null) {
- throw new IllegalArgumentException("can't find find file");
- }
- String file = resource.getFile();
- Config config = ConfigFactory.parseFile(new File(file));
- assert config.hasPath("dm_table_source") && config.hasPath("DML") &&
config.hasPath("dm_table_sink");
- try (Statement statement = jdbcConnection.createStatement()) {
- // source
- String sourceTableDDL = config.getString("dm_table_source");
- statement.execute(sourceTableDDL);
- String insertSQL = config.getString("DML");
- statement.execute(insertSQL);
- // sink
- String sinkTableDDL = config.getString("dm_table_sink");
- statement.execute(sinkTableDDL);
- } catch (SQLException e) {
- throw new RuntimeException("Initializing table failed!", e);
- }
- }
-
- private void assertHasData(String table) {
- try (Statement statement = jdbcConnection.createStatement()) {
- String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
- ResultSet source = statement.executeQuery(sql);
- Assertions.assertTrue(source.next());
- } catch (SQLException e) {
- throw new RuntimeException("test dm server image error", e);
- }
- }
-
- @Test
- @DisplayName("JDBC-DM container can be pull")
- public void testDMDBImage() {
- assertHasData(SOURCE_TABLE);
- }
-
- @Test
- @DisplayName("spark JDBC-DM test for all type mapper")
- public void testDMDBSourceToJdbcSink() throws SQLException, IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_dm_source_and_sink.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- // assert
- assertHasData(SINK_TABLE);
- JdbcE2eUtil.compare(jdbcConnection, String.format("select * from %s.%s
limit 1", DATABASE, SOURCE_TABLE),
- String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE),
- Lists.newArrayList("DM_BIT",
- "DM_INT",
- "DM_INTEGER",
- "DM_PLS_INTEGER",
- "DM_TINYINT",
- "DM_BYTE",
- "DM_SMALLINT",
- "DM_BIGINT",
- "DM_NUMERIC",
- "DM_NUMBER",
- "DM_DECIMAL",
- "DM_DEC",
- "DM_REAL",
- "DM_FLOAT",
- "DM_DOUBLE_PRECISION",
- "DM_DOUBLE",
- "DM_CHAR",
- "DM_CHARACTER",
- "DM_VARCHAR",
- "DM_VARCHAR2",
- "DM_TEXT",
- "DM_LONG",
- "DM_LONGVARCHAR"));
- }
-
- @Override
- protected void executeExtraCommands(GenericContainer<?> container) throws
IOException, InterruptedException {
- Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
- Assertions.assertEquals(0, extraCommands.getExitCode());
- }
-
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
deleted file mode 100644
index 056c252a1..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
+++ /dev/null
@@ -1,122 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-dm_table_source = """
-create table if not exists "SYSDBA".e2e_table_source
-(
- DM_BIT BIT,
- DM_INT INT,
- DM_INTEGER INTEGER,
- DM_PLS_INTEGER PLS_INTEGER,
- DM_TINYINT TINYINT,
-
- DM_BYTE BYTE,
- DM_SMALLINT SMALLINT,
- DM_BIGINT BIGINT,
-
- DM_NUMERIC NUMERIC,
- DM_NUMBER NUMBER,
- DM_DECIMAL DECIMAL,
- DM_DEC DEC,
-
- DM_REAL REAL,
- DM_FLOAT FLOAT,
- DM_DOUBLE_PRECISION DOUBLE PRECISION,
- DM_DOUBLE DOUBLE,
-
- DM_CHAR CHAR,
- DM_CHARACTER CHARACTER,
- DM_VARCHAR VARCHAR,
- DM_VARCHAR2 VARCHAR2,
- DM_TEXT TEXT,
- DM_LONG LONG,
- DM_LONGVARCHAR LONGVARCHAR,
- DM_CLOB CLOB,
-
- DM_TIMESTAMP TIMESTAMP,
- DM_DATETIME DATETIME,
- DM_TIME TIME,
- DM_DATE DATE,
-
- DM_BLOB BLOB,
- DM_BINARY BINARY,
- DM_VARBINARY VARBINARY,
- DM_LONGVARBINARY LONGVARBINARY,
- DM_IMAGE IMAGE,
- DM_BFILE BFILE
-)
-"""
-
-dm_table_sink = """
-create table if not exists "SYSDBA".e2e_table_sink
-(
- DM_BIT BIT,
- DM_INT INT,
- DM_INTEGER INTEGER,
- DM_PLS_INTEGER PLS_INTEGER,
- DM_TINYINT TINYINT,
-
- DM_BYTE BYTE,
- DM_SMALLINT SMALLINT,
- DM_BIGINT BIGINT,
-
- DM_NUMERIC NUMERIC,
- DM_NUMBER NUMBER,
- DM_DECIMAL DECIMAL,
- DM_DEC DEC,
-
- DM_REAL REAL,
- DM_FLOAT FLOAT,
- DM_DOUBLE_PRECISION DOUBLE PRECISION,
- DM_DOUBLE DOUBLE,
-
- DM_CHAR CHAR,
- DM_CHARACTER CHARACTER,
- DM_VARCHAR VARCHAR,
- DM_VARCHAR2 VARCHAR2,
- DM_TEXT TEXT,
- DM_LONG LONG,
- DM_LONGVARCHAR LONGVARCHAR,
- DM_CLOB CLOB,
-
- DM_TIMESTAMP TIMESTAMP,
- DM_DATETIME DATETIME,
- DM_TIME TIME,
- DM_DATE DATE,
-
- DM_BLOB BLOB,
- DM_BINARY BINARY,
- DM_VARBINARY VARBINARY,
- DM_LONGVARBINARY LONGVARBINARY,
- DM_IMAGE IMAGE,
- DM_BFILE BFILE
-)
-"""
-// only need for source
-DML = """
-INSERT INTO "SYSDBA".e2e_table_source (
-DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT,
DM_BIGINT,
-DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT,
DM_DOUBLE_PRECISION, DM_DOUBLE,
-DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG,
DM_LONGVARCHAR, DM_CLOB,
-DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE,
-DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
-VALUES
-(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1',
- 'a', 'a', 'a', 'a', 'a', 'a', 'a',
-'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00',
'2022-08-13',
-null, null, null, null, null, null)
-"""
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
deleted file mode 100644
index 9a101bacf..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
+++ /dev/null
@@ -1,104 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
- job.mode = "BATCH"
-}
-
-source {
- Jdbc {
- url = "jdbc:dm://spark_e2e_dmdb:5236"
- driver = "dm.jdbc.driver.DmDriver"
- connection_check_timeout_sec = 1000
- user = "SYSDBA"
- password = "SYSDBA"
- query = """
- SELECT DM_BIT,
- DM_INT,
- DM_INTEGER,
- DM_PLS_INTEGER,
- DM_TINYINT,
- DM_BYTE,
- DM_SMALLINT,
- DM_BIGINT,
- DM_NUMERIC,
- DM_NUMBER,
- DM_DECIMAL,
- DM_DEC,
- DM_REAL,
- DM_FLOAT,
- DM_DOUBLE_PRECISION,
- DM_DOUBLE,
- DM_CHAR,
- DM_CHARACTER,
- DM_VARCHAR,
- DM_VARCHAR2,
- DM_TEXT,
- DM_LONG,
- DM_LONGVARCHAR
-FROM "SYSDBA".e2e_table_source
-"""
- partition_column = "DM_INT"
- }
-
-}
-
-transform {
-
-}
-
-sink {
- Jdbc {
- url = "jdbc:dm://spark_e2e_dmdb:5236"
- driver = "dm.jdbc.driver.DmDriver"
- connection_check_timeout_sec = 1000
- user = "SYSDBA"
- password = "SYSDBA"
- query = """
-INSERT INTO "SYSDBA".e2e_table_sink (DM_BIT,
- DM_INT,
- DM_INTEGER,
- DM_PLS_INTEGER,
- DM_TINYINT,
- DM_BYTE,
- DM_SMALLINT,
- DM_BIGINT,
- DM_NUMERIC,
- DM_NUMBER,
- DM_DECIMAL,
- DM_DEC,
- DM_REAL,
- DM_FLOAT,
- DM_DOUBLE_PRECISION,
- DM_DOUBLE,
- DM_CHAR,
- DM_CHARACTER,
- DM_VARCHAR,
- DM_VARCHAR2,
- DM_TEXT,
- DM_LONG,
- DM_LONGVARCHAR)
-VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-"""
- }
-}
-