This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 15636bdea [Improve][e2e] Unified e2e IT for DaMengDB (#2946)
15636bdea is described below

commit 15636bdea1580196118b942d2d3587e0262d2b8f
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Sep 29 16:36:24 2022 +0800

    [Improve][e2e] Unified e2e IT for DaMengDB (#2946)
    
    * [Improve][e2e] Remove the un-unified e2e IT of DaMengDB
    
    * [Improve][e2e] Unified e2e IT for DaMengDB
---
 pom.xml                                            |   2 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |   3 -
 .../pom.xml                                        |  17 +-
 .../connectors/seatunnel}/jdbc/JdbcDmdbIT.java     | 123 ++++++-------
 .../jdbc/internal/xa/XaGroupOpsImplIT.java         |   0
 .../seatunnel/jdbc/util/JdbcCompareUtil.java       | 100 +++++++++++
 .../src/test/resources/init}/dm_init.conf          |   6 +-
 .../test/resources}/jdbc_dm_source_and_sink.conf   |  12 +-
 .../src/test/resources/log4j.properties            |   0
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   2 +-
 .../common/container/ContainerExtendedFactory.java |   4 +-
 .../e2e/common/container/TestContainer.java        |   2 +-
 .../flink/AbstractTestFlinkContainer.java          |   2 +-
 .../spark/AbstractTestSparkContainer.java          |   2 +-
 .../seatunnel/e2e/common/util/ContainerUtil.java   |   6 +-
 .../connector-jdbc-flink-e2e/pom.xml               |  17 +-
 .../connector-jdbc-spark-e2e/pom.xml               |  15 +-
 .../seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java    | 191 ---------------------
 .../src/test/resources/jdbc/init_sql/dm_init.conf  | 122 -------------
 .../resources/jdbc/jdbc_dm_source_and_sink.conf    | 104 -----------
 20 files changed, 196 insertions(+), 534 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2e3ffe51c..aeea31a22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,7 +181,7 @@
         <docker-maven-plugin.version>0.38.0</docker-maven-plugin.version>
         <p3c-pmd.version>1.3.0</p3c-pmd.version>
         
<maven-scm-provider-jgit.version>1.9.5</maven-scm-provider-jgit.version>
-        <testcontainer.version>1.16.3</testcontainer.version>
+        <testcontainer.version>1.17.3</testcontainer.version>
         <!-- Option args -->
         <skipUT>false</skipUT>
         <skipIT>true</skipIT>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index a9f7f8671..d61865a19 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -76,13 +76,10 @@
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
-            <version>${postgresql.version}</version>
         </dependency>
         <dependency>
             <groupId>com.dameng</groupId>
             <artifactId>DmJdbcDriver18</artifactId>
-            <version>${dm-jdbc.version}</version>
-            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.aliyun.phoenix</groupId>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
similarity index 86%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
index f80fb081c..4baadbefd 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
@@ -23,11 +23,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-jdbc-it</artifactId>
-
-    <properties>
-        <testcontainers.version>1.17.3</testcontainers.version>
-    </properties>
+    <artifactId>connector-jdbc-e2e</artifactId>
 
     <dependencyManagement>
         <dependencies>
@@ -50,18 +46,25 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- jdbc containers -->
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mysql</artifactId>
-            <version>${testcontainers.version}</version>
+            <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
 
+        <!-- drivers -->
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.dameng</groupId>
+            <artifactId>DmJdbcDriver18</artifactId>
+            <scope>test</scope>
         </dependency>
-
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
similarity index 62%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
index 39004a35b..47da09b8c 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
@@ -15,21 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.flink.v2.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
 import static org.awaitility.Awaitility.given;
 
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.util.JdbcCompareUtil;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -46,11 +52,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 @Slf4j
-public class JdbcDmdbIT extends FlinkContainer {
+public class JdbcDmdbIT extends TestSuiteBase implements TestResource {
 
     private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
     private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
-    private static final String HOST = "flink_e2e_dmdb";
+    private static final String HOST = "e2e_dmdb";
     private static final String URL = "jdbc:dm://%s:5236";
     private static final String USERNAME = "SYSDBA";
     private static final String PASSWORD = "SYSDBA";
@@ -58,50 +64,47 @@ public class JdbcDmdbIT extends FlinkContainer {
     private static final String SOURCE_TABLE = "e2e_table_source";
     private static final String SINK_TABLE = "e2e_table_sink";
     private static final String DM_DRIVER_JAR = 
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar";;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory = container -> {
+        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + DM_DRIVER_JAR);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    };
+
     private Connection jdbcConnection;
     private GenericContainer<?> dbServer;
-    private static final String THIRD_PARTY_PLUGINS_URL = 
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.2.141/DmJdbcDriver18-8.1.2.141.jar";;
 
-    @BeforeEach
-    public void startDmdbContainer() throws ClassNotFoundException {
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
         dbServer = new GenericContainer<>(DOCKER_IMAGE)
-            .withNetwork(NETWORK)
-            .withNetworkAliases(HOST)
-            .withLogConsumer(new Slf4jLogConsumer(log));
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
         dbServer.setPortBindings(Lists.newArrayList(
-            String.format("%s:%s", 5236, 5236)));
+                String.format("%s:%s", 5236, 5236)));
         Startables.deepStart(Stream.of(dbServer)).join();
         log.info("Dmdb container started");
         // wait for Dmdb fully start
         Class.forName(DRIVER_CLASS);
         given().ignoreExceptions()
-            .await()
-            .atMost(180, TimeUnit.SECONDS)
-            .untilAsserted(this::initializeJdbcConnection);
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
         initializeJdbcTable();
     }
 
     private void initializeJdbcConnection() throws SQLException {
         jdbcConnection = DriverManager.getConnection(String.format(
-            URL, dbServer.getHost()), USERNAME, PASSWORD);
-    }
-
-    @Override
-    protected void executeExtraCommands(GenericContainer<?> container) throws 
IOException, InterruptedException {
-        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + DM_DRIVER_JAR);
-        Assertions.assertEquals(0, extraCommands.getExitCode());
+                URL, dbServer.getHost()), USERNAME, PASSWORD);
     }
 
     /**
      * init the table for DM_SERVER, DDL and DML for source and sink
      */
     private void initializeJdbcTable() {
-        java.net.URL resource = 
FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
-        if (resource == null) {
-            throw new IllegalArgumentException("can't find find file");
-        }
-        String file = resource.getFile();
-        Config config = ConfigFactory.parseFile(new File(file));
+        File file = ContainerUtil.getResourcesFile("/init/dm_init.conf");
+        Config config = ConfigFactory.parseFile(file);
         assert config.hasPath("dm_table_source") && config.hasPath("DML") && 
config.hasPath("dm_table_sink");
         try (Statement statement = jdbcConnection.createStatement()) {
             // source
@@ -117,18 +120,9 @@ public class JdbcDmdbIT extends FlinkContainer {
         }
     }
 
-    private void assertHasData(String table) {
-        try (Statement statement = jdbcConnection.createStatement()) {
-            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
-            ResultSet source = statement.executeQuery(sql);
-            Assertions.assertTrue(source.next());
-        } catch (SQLException e) {
-            throw new RuntimeException("test dm server image error", e);
-        }
-    }
-
-    @AfterEach
-    public void closeDmdbContainer() throws SQLException {
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
         if (jdbcConnection != null) {
             jdbcConnection.close();
         }
@@ -137,29 +131,36 @@ public class JdbcDmdbIT extends FlinkContainer {
         }
     }
 
-    @Test
-    @DisplayName("JDBC-DM container can be pull")
-    public void testDMDBImage() {
+    @TestTemplate
+    @DisplayName("JDBC-DM end to end test")
+    public void testJdbcDmdb(TestContainer container) throws IOException, 
InterruptedException, SQLException {
         assertHasData(SOURCE_TABLE);
-    }
-
-    @Test
-    @DisplayName("flink JDBC-DM test")
-    public void testJdbcDmdbSourceAndSink() throws IOException, 
InterruptedException, SQLException {
-        assertHasData(SOURCE_TABLE);
-        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_dm_source_and_sink.conf");
+        Container.ExecResult execResult = 
container.executeJob("/jdbc_dm_source_and_sink.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
         assertHasData(SINK_TABLE);
-        JdbcE2eUtil.compare(jdbcConnection, String.format("select * from %s.%s 
limit 1", DATABASE, SOURCE_TABLE),
-            String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE),
-            "DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, 
DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER, "
-                + "DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, 
DM_DOUBLE, DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2,"
-                + " DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, 
DM_DATETIME, DM_TIME, DM_DATE, DM_BLOB, DM_BINARY, DM_VARBINARY, 
DM_LONGVARBINARY");
+        JdbcCompareUtil.compare(jdbcConnection, String.format("select * from 
%s.%s limit 1", DATABASE, SOURCE_TABLE),
+                String.format("select * from %s.%s limit 1", DATABASE, 
SINK_TABLE),
+                "DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, 
DM_BYTE, DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER, "
+                        + "DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, 
DM_DOUBLE_PRECISION, DM_DOUBLE, DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2,"
+                        + " DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB, 
DM_TIMESTAMP, DM_DATETIME, DM_DATE, DM_BLOB, DM_BINARY, DM_VARBINARY, 
DM_LONGVARBINARY");
+        clearSinkTable();
     }
 
-    @Override
-    protected void executeExtraCommands(GenericContainer<?> container) throws 
IOException, InterruptedException {
-        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
-        Assertions.assertEquals(0, extraCommands.getExitCode());
+    private void assertHasData(String table) {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    private void clearSinkTable() {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute(String.format("TRUNCATE TABLE %s.%s", DATABASE, 
SINK_TABLE));
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
similarity index 100%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/util/JdbcCompareUtil.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/util/JdbcCompareUtil.java
new file mode 100644
index 000000000..709976d41
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/util/JdbcCompareUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.util;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class JdbcCompareUtil {
+
+    public static void compare(Connection conn, String sourceSql, String 
sinkSQL, String columns) throws SQLException, IOException {
+        String[] split = StringUtils.split(columns, ",");
+        List<String> columnList = Arrays.stream(split)
+            .map(o -> StringUtils.remove(o, "\n"))
+            .map(String::trim)
+            .collect(Collectors.toList());
+        compare(conn, sourceSql, sinkSQL, columnList);
+    }
+
+    @SuppressWarnings("magicnumber")
+    public static void compare(Connection conn, String sourceSql, String 
sinkSQL, List<String> columns) throws SQLException, IOException {
+        Assertions.assertTrue(conn.isValid(10));
+        try (Statement sourceState = conn.createStatement();
+             Statement sinkState = conn.createStatement()
+        ) {
+            ResultSet source = sourceState.executeQuery(sourceSql);
+            ResultSet sink = sinkState.executeQuery(sinkSQL);
+            compareResultSet(source, sink, columns);
+        }
+    }
+
+    private static void compareResultSet(ResultSet source, ResultSet sink, 
List<String> columns) throws SQLException, IOException {
+        Assertions.assertNotNull(source, "source can't be null");
+        Assertions.assertNotNull(sink, "source can't be null");
+        ResultSetMetaData sourceMetaData = source.getMetaData();
+        ResultSetMetaData sinkMetaData = sink.getMetaData();
+        Assertions.assertEquals(sourceMetaData.getColumnCount(), 
sinkMetaData.getColumnCount());
+        while (source.next()) {
+            if (sink.next()) {
+                // compare by name
+                if (CollectionUtils.isNotEmpty(columns)) {
+                    for (String column : columns) {
+                        int sourceCnt = source.findColumn(column);
+                        int sinkCnt = sink.findColumn(column);
+                        Assertions.assertTrue(compareColumn(source, sink, 
sourceCnt, sinkCnt));
+                    }
+                } else {
+                    // compare all column
+                    for (int i = 1; i <= sourceMetaData.getColumnCount(); i++) 
{
+                        Assertions.assertTrue(compareColumn(source, sink, i, 
i));
+                    }
+                }
+                continue;
+            }
+            Assertions.fail("the row of source != sink");
+        }
+    }
+
+    private static boolean compareColumn(ResultSet source, ResultSet sink, int 
sourceCnt, int sinkCnt) throws SQLException, IOException {
+        Object sourceObject = source.getObject(sourceCnt);
+        Object sinkObject = sink.getObject(sinkCnt);
+        if (Objects.deepEquals(sourceObject, sinkObject)) {
+            return true;
+        }
+        InputStream sourceAsciiStream = source.getBinaryStream(sourceCnt);
+        InputStream sinkAsciiStream = sink.getBinaryStream(sinkCnt);
+        String sourceValue = IOUtils.toString(sourceAsciiStream, 
StandardCharsets.UTF_8);
+        String sinkValue = IOUtils.toString(sinkAsciiStream, 
StandardCharsets.UTF_8);
+        return StringUtils.equals(sourceValue, sinkValue);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/init/dm_init.conf
similarity index 94%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/init/dm_init.conf
index 056c252a1..299cc7c36 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/init/dm_init.conf
@@ -49,7 +49,6 @@ create table if not exists "SYSDBA".e2e_table_source
 
     DM_TIMESTAMP        TIMESTAMP,
     DM_DATETIME         DATETIME,
-    DM_TIME             TIME,
     DM_DATE             DATE,
 
     DM_BLOB             BLOB,
@@ -95,7 +94,6 @@ create table if not exists "SYSDBA".e2e_table_sink
 
     DM_TIMESTAMP        TIMESTAMP,
     DM_DATETIME         DATETIME,
-    DM_TIME             TIME,
     DM_DATE             DATE,
 
     DM_BLOB             BLOB,
@@ -112,11 +110,11 @@ INSERT INTO "SYSDBA".e2e_table_source (
 DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, 
DM_BIGINT,
 DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, 
DM_DOUBLE_PRECISION, DM_DOUBLE,
 DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG, 
DM_LONGVARCHAR, DM_CLOB,
-DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE,
+DM_TIMESTAMP, DM_DATETIME, DM_DATE,
 DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
 VALUES
 (0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1',
  'a', 'a', 'a', 'a', 'a', 'a', 'a',
-'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00', 
'2022-08-13',
+'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '2022-08-13',
 null, null, null, null, null, null)
 """
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_dm_source_and_sink.conf
similarity index 78%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_dm_source_and_sink.conf
index ba2468ccf..a536be5e0 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_dm_source_and_sink.conf
@@ -16,16 +16,13 @@
 #
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
 source {
   Jdbc {
-    url = "jdbc:dm://flink_e2e_dmdb:5236"
+    url = "jdbc:dm://e2e_dmdb:5236"
     driver = "dm.jdbc.driver.DmDriver"
     connection_check_timeout_sec = 1000
     user = "SYSDBA"
@@ -36,12 +33,11 @@ source {
 }
 
 transform {
-
 }
 
 sink {
   Jdbc {
-    url = "jdbc:dm://flink_e2e_dmdb:5236"
+    url = "jdbc:dm://e2e_dmdb:5236"
     driver = "dm.jdbc.driver.DmDriver"
     connection_check_timeout_sec = 1000
     user = "SYSDBA"
@@ -49,8 +45,8 @@ sink {
     query = """
 INSERT INTO SYSDBA.e2e_table_sink (DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, 
DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER,
  DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE, 
DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG,
- DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE, 
DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
-VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, DM_DATETIME, DM_DATE, DM_BLOB, 
DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
+VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 """
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/log4j.properties
similarity index 100%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/log4j.properties
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 8066bb049..fbc2b214a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -25,7 +25,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>connector-assert-e2e</module>
-        <module>connector-jdbc-it</module>
+        <module>connector-jdbc-e2e</module>
         <module>connector-redis-e2e</module>
     </modules>
 
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
index 3f760a225..e7d635f36 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/ContainerExtendedFactory.java
@@ -19,8 +19,10 @@ package org.apache.seatunnel.e2e.common.container;
 
 import org.testcontainers.containers.GenericContainer;
 
+import java.io.IOException;
+
 @FunctionalInterface
 public interface ContainerExtendedFactory {
 
-    void extend(GenericContainer<?> engineMasterContainer);
+    void extend(GenericContainer<?> engineMasterContainer) throws IOException, 
InterruptedException;
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index 3d17759bb..7189ffdb6 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -30,7 +30,7 @@ public interface TestContainer extends TestResource {
 
     String identifier();
 
-    void executeExtraCommands(ContainerExtendedFactory extendedFactory);
+    void executeExtraCommands(ContainerExtendedFactory extendedFactory) throws 
IOException, InterruptedException;
 
     Container.ExecResult executeJob(String confFile) throws IOException, 
InterruptedException;
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 521b1fa2b..07629c4ac 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -108,7 +108,7 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
         return Collections.emptyList();
     }
 
-    public void executeExtraCommands(ContainerExtendedFactory extendedFactory) 
{
+    public void executeExtraCommands(ContainerExtendedFactory extendedFactory) 
throws IOException, InterruptedException {
         extendedFactory.extend(jobManager);
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index 27127f1d9..4f3118ab3 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -74,7 +74,7 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
             "--deploy-mode client");
     }
 
-    public void executeExtraCommands(ContainerExtendedFactory extendedFactory) 
{
+    public void executeExtraCommands(ContainerExtendedFactory extendedFactory) 
throws IOException, InterruptedException {
         extendedFactory.extend(master);
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 4113ef042..eccf6f740 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -60,7 +60,7 @@ public final class ContainerUtil {
                                                    String connectorPrefix,
                                                    String connectorType,
                                                    String seatunnelHome) {
-        Config jobConfig = getConfig(getConfigFile(confFile));
+        Config jobConfig = getConfig(getResourcesFile(confFile));
         Config connectorsMapping = getConfig(new File(PROJECT_ROOT_PATH + 
File.separator + PLUGIN_MAPPING_FILE));
         if (!connectorsMapping.hasPath(connectorType) || 
connectorsMapping.getConfig(connectorType).isEmpty()) {
             return;
@@ -79,7 +79,7 @@ public final class ContainerUtil {
 
     public static String copyConfigFileToContainer(GenericContainer<?> 
container, String confFile) {
         final String targetConfInContainer = Paths.get("/tmp", 
confFile).toString();
-        
container.copyFileToContainer(MountableFile.forHostPath(getConfigFile(confFile).getAbsolutePath()),
 targetConfInContainer);
+        
container.copyFileToContainer(MountableFile.forHostPath(getResourcesFile(confFile).getAbsolutePath()),
 targetConfInContainer);
         return targetConfInContainer;
     }
 
@@ -158,7 +158,7 @@ public final class ContainerUtil {
         return Paths.get(System.getProperty("user.dir"));
     }
 
-    private static File getConfigFile(String confFile) {
+    public static File getResourcesFile(String confFile) {
         File file = new File(getCurrentModulePath() + "/src/test/resources" + 
confFile);
         if (file.exists()) {
             return file;
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index 5c5abe1ee..d48da1f48 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -25,10 +25,6 @@
 
     <artifactId>connector-jdbc-flink-e2e</artifactId>
 
-    <properties>
-        <testcontainers.version>1.17.3</testcontainers.version>
-    </properties>
-
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -71,21 +67,21 @@
             <scope>test</scope>
         </dependency>
 
-        <!-- jdbc drivers -->
+        <!-- jdbc containers -->
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>postgresql</artifactId>
-            <version>${testcontainers.version}</version>
+            <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
-
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mysql</artifactId>
-            <version>${testcontainers.version}</version>
+            <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
 
+        <!-- jdbc drivers -->
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
@@ -96,11 +92,6 @@
             <artifactId>postgresql</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.dameng</groupId>
-            <artifactId>DmJdbcDriver18</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>com.aliyun.phoenix</groupId>
             <artifactId>ali-phoenix-shaded-thin-client</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index a55f14f02..7d16b9182 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -25,10 +25,6 @@
 
     <artifactId>connector-jdbc-spark-e2e</artifactId>
 
-    <properties>
-        <testcontainers.version>1.17.3</testcontainers.version>
-    </properties>
-
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -63,14 +59,14 @@
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>postgresql</artifactId>
-            <version>${testcontainers.version}</version>
+            <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mysql</artifactId>
-            <version>${testcontainers.version}</version>
+            <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
 
@@ -78,18 +74,13 @@
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
         </dependency>
-
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.dameng</groupId>
-            <artifactId>DmJdbcDriver18</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>com.aliyun.phoenix</groupId>
             <artifactId>ali-phoenix-shaded-thin-client</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
deleted file mode 100644
index 68a5a0613..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.jdbc;
-
-import static org.awaitility.Awaitility.given;
-
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class JdbcDmdbIT extends SparkContainer {
-
-    private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
-    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
-    private static final String HOST = "spark_e2e_dmdb";
-    private static final String URL = "jdbc:dm://%s:5236";
-    private static final String USERNAME = "SYSDBA";
-    private static final String PASSWORD = "SYSDBA";
-    private static final String DATABASE = "SYSDBA";
-    private static final String SOURCE_TABLE = "e2e_table_source";
-    private static final String SINK_TABLE = "e2e_table_sink";
-    public static final String DM_DRIVER_JAR = 
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar";;
-    private GenericContainer<?> dbServer;
-    private Connection jdbcConnection;
-    private static final String THIRD_PARTY_PLUGINS_URL = 
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.2.141/DmJdbcDriver18-8.1.2.141.jar";;
-
-    @BeforeEach
-    public void beforeAllForDM() {
-        try {
-            dbServer = new GenericContainer<>(DM_DOCKER_IMAGE)
-                .withNetwork(NETWORK)
-                .withNetworkAliases(HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
-            dbServer.setPortBindings(Lists.newArrayList("5236:5236"));
-            Startables.deepStart(Stream.of(dbServer)).join();
-            log.info("dmdb container started");
-            Class.forName(DRIVER_CLASS);
-            given().ignoreExceptions()
-                .await()
-                .atMost(180, TimeUnit.SECONDS)
-                .untilAsserted(this::initializeJdbcConnection);
-            initializeJdbcTable();
-        } catch (Exception ex) {
-            log.error("dm container init failed", ex);
-            throw new RuntimeException(ex);
-        }
-    }
-
-    @AfterEach
-    public void closeDmdbContainer() throws SQLException {
-        if (jdbcConnection != null) {
-            jdbcConnection.close();
-        }
-        if (dbServer != null) {
-            dbServer.close();
-        }
-    }
-
-    @Override
-    protected void executeExtraCommands(GenericContainer<?> container) throws 
IOException, InterruptedException {
-        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib "
-            + "&& cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + 
DM_DRIVER_JAR);
-        Assertions.assertEquals(0, extraCommands.getExitCode());
-    }
-
-    private void initializeJdbcConnection() throws SQLException {
-        jdbcConnection = DriverManager.getConnection(String.format(
-            URL, dbServer.getHost()), USERNAME, PASSWORD);
-    }
-
-    /**
-     * init the table for DM_SERVER, DDL and DML for source and sink
-     */
-    private void initializeJdbcTable() {
-        URL resource = 
JdbcDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf");
-        if (resource == null) {
-            throw new IllegalArgumentException("can't find find file");
-        }
-        String file = resource.getFile();
-        Config config = ConfigFactory.parseFile(new File(file));
-        assert config.hasPath("dm_table_source") && config.hasPath("DML") && 
config.hasPath("dm_table_sink");
-        try (Statement statement = jdbcConnection.createStatement()) {
-            // source
-            String sourceTableDDL = config.getString("dm_table_source");
-            statement.execute(sourceTableDDL);
-            String insertSQL = config.getString("DML");
-            statement.execute(insertSQL);
-            // sink
-            String sinkTableDDL = config.getString("dm_table_sink");
-            statement.execute(sinkTableDDL);
-        } catch (SQLException e) {
-            throw new RuntimeException("Initializing table failed!", e);
-        }
-    }
-
-    private void assertHasData(String table) {
-        try (Statement statement = jdbcConnection.createStatement()) {
-            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
-            ResultSet source = statement.executeQuery(sql);
-            Assertions.assertTrue(source.next());
-        } catch (SQLException e) {
-            throw new RuntimeException("test dm server image error", e);
-        }
-    }
-
-    @Test
-    @DisplayName("JDBC-DM container can be pull")
-    public void testDMDBImage() {
-        assertHasData(SOURCE_TABLE);
-    }
-
-    @Test
-    @DisplayName("spark JDBC-DM test for all type mapper")
-    public void testDMDBSourceToJdbcSink() throws SQLException, IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_dm_source_and_sink.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        // assert
-        assertHasData(SINK_TABLE);
-        JdbcE2eUtil.compare(jdbcConnection, String.format("select * from %s.%s 
limit 1", DATABASE, SOURCE_TABLE),
-            String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE),
-            Lists.newArrayList("DM_BIT",
-                "DM_INT",
-                "DM_INTEGER",
-                "DM_PLS_INTEGER",
-                "DM_TINYINT",
-                "DM_BYTE",
-                "DM_SMALLINT",
-                "DM_BIGINT",
-                "DM_NUMERIC",
-                "DM_NUMBER",
-                "DM_DECIMAL",
-                "DM_DEC",
-                "DM_REAL",
-                "DM_FLOAT",
-                "DM_DOUBLE_PRECISION",
-                "DM_DOUBLE",
-                "DM_CHAR",
-                "DM_CHARACTER",
-                "DM_VARCHAR",
-                "DM_VARCHAR2",
-                "DM_TEXT",
-                "DM_LONG",
-                "DM_LONGVARCHAR"));
-    }
-
-    @Override
-    protected void executeExtraCommands(GenericContainer<?> container) throws 
IOException, InterruptedException {
-        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
-        Assertions.assertEquals(0, extraCommands.getExitCode());
-    }
-
-}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
deleted file mode 100644
index 056c252a1..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
+++ /dev/null
@@ -1,122 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-dm_table_source = """
-create table if not exists "SYSDBA".e2e_table_source
-(
-    DM_BIT              BIT,
-    DM_INT              INT,
-    DM_INTEGER          INTEGER,
-    DM_PLS_INTEGER      PLS_INTEGER,
-    DM_TINYINT          TINYINT,
-
-    DM_BYTE             BYTE,
-    DM_SMALLINT         SMALLINT,
-    DM_BIGINT           BIGINT,
-
-    DM_NUMERIC          NUMERIC,
-    DM_NUMBER           NUMBER,
-    DM_DECIMAL          DECIMAL,
-    DM_DEC              DEC,
-
-    DM_REAL             REAL,
-    DM_FLOAT            FLOAT,
-    DM_DOUBLE_PRECISION DOUBLE PRECISION,
-    DM_DOUBLE           DOUBLE,
-
-    DM_CHAR             CHAR,
-    DM_CHARACTER        CHARACTER,
-    DM_VARCHAR          VARCHAR,
-    DM_VARCHAR2         VARCHAR2,
-    DM_TEXT             TEXT,
-    DM_LONG             LONG,
-    DM_LONGVARCHAR      LONGVARCHAR,
-    DM_CLOB             CLOB,
-
-    DM_TIMESTAMP        TIMESTAMP,
-    DM_DATETIME         DATETIME,
-    DM_TIME             TIME,
-    DM_DATE             DATE,
-
-    DM_BLOB             BLOB,
-    DM_BINARY           BINARY,
-    DM_VARBINARY        VARBINARY,
-    DM_LONGVARBINARY    LONGVARBINARY,
-    DM_IMAGE            IMAGE,
-    DM_BFILE            BFILE
-)
-"""
-
-dm_table_sink = """
-create table if not exists "SYSDBA".e2e_table_sink
-(
-    DM_BIT              BIT,
-    DM_INT              INT,
-    DM_INTEGER          INTEGER,
-    DM_PLS_INTEGER      PLS_INTEGER,
-    DM_TINYINT          TINYINT,
-
-    DM_BYTE             BYTE,
-    DM_SMALLINT         SMALLINT,
-    DM_BIGINT           BIGINT,
-
-    DM_NUMERIC          NUMERIC,
-    DM_NUMBER           NUMBER,
-    DM_DECIMAL          DECIMAL,
-    DM_DEC              DEC,
-
-    DM_REAL             REAL,
-    DM_FLOAT            FLOAT,
-    DM_DOUBLE_PRECISION DOUBLE PRECISION,
-    DM_DOUBLE           DOUBLE,
-
-    DM_CHAR             CHAR,
-    DM_CHARACTER        CHARACTER,
-    DM_VARCHAR          VARCHAR,
-    DM_VARCHAR2         VARCHAR2,
-    DM_TEXT             TEXT,
-    DM_LONG             LONG,
-    DM_LONGVARCHAR      LONGVARCHAR,
-    DM_CLOB             CLOB,
-
-    DM_TIMESTAMP        TIMESTAMP,
-    DM_DATETIME         DATETIME,
-    DM_TIME             TIME,
-    DM_DATE             DATE,
-
-    DM_BLOB             BLOB,
-    DM_BINARY           BINARY,
-    DM_VARBINARY        VARBINARY,
-    DM_LONGVARBINARY    LONGVARBINARY,
-    DM_IMAGE            IMAGE,
-    DM_BFILE            BFILE
-)
-"""
-// only need for source
-DML = """
-INSERT INTO "SYSDBA".e2e_table_source (
-DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, 
DM_BIGINT,
-DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, 
DM_DOUBLE_PRECISION, DM_DOUBLE,
-DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG, 
DM_LONGVARCHAR, DM_CLOB,
-DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE,
-DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
-VALUES
-(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1',
- 'a', 'a', 'a', 'a', 'a', 'a', 'a',
-'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00', 
'2022-08-13',
-null, null, null, null, null, null)
-"""
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
deleted file mode 100644
index 9a101bacf..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
+++ /dev/null
@@ -1,104 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-  job.mode = "BATCH"
-}
-
-source {
-  Jdbc {
-    url = "jdbc:dm://spark_e2e_dmdb:5236"
-    driver = "dm.jdbc.driver.DmDriver"
-    connection_check_timeout_sec = 1000
-    user = "SYSDBA"
-    password = "SYSDBA"
-    query = """
-    SELECT DM_BIT,
-       DM_INT,
-       DM_INTEGER,
-       DM_PLS_INTEGER,
-       DM_TINYINT,
-       DM_BYTE,
-       DM_SMALLINT,
-       DM_BIGINT,
-       DM_NUMERIC,
-       DM_NUMBER,
-       DM_DECIMAL,
-       DM_DEC,
-       DM_REAL,
-       DM_FLOAT,
-       DM_DOUBLE_PRECISION,
-       DM_DOUBLE,
-       DM_CHAR,
-       DM_CHARACTER,
-       DM_VARCHAR,
-       DM_VARCHAR2,
-       DM_TEXT,
-       DM_LONG,
-       DM_LONGVARCHAR
-FROM "SYSDBA".e2e_table_source
-"""
-    partition_column = "DM_INT"
-  }
-
-}
-
-transform {
-
-}
-
-sink {
-  Jdbc {
-    url = "jdbc:dm://spark_e2e_dmdb:5236"
-    driver = "dm.jdbc.driver.DmDriver"
-    connection_check_timeout_sec = 1000
-    user = "SYSDBA"
-    password = "SYSDBA"
-    query = """
-INSERT INTO "SYSDBA".e2e_table_sink (DM_BIT,
-                                     DM_INT,
-                                     DM_INTEGER,
-                                     DM_PLS_INTEGER,
-                                     DM_TINYINT,
-                                     DM_BYTE,
-                                     DM_SMALLINT,
-                                     DM_BIGINT,
-                                     DM_NUMERIC,
-                                     DM_NUMBER,
-                                     DM_DECIMAL,
-                                     DM_DEC,
-                                     DM_REAL,
-                                     DM_FLOAT,
-                                     DM_DOUBLE_PRECISION,
-                                     DM_DOUBLE,
-                                     DM_CHAR,
-                                     DM_CHARACTER,
-                                     DM_VARCHAR,
-                                     DM_VARCHAR2,
-                                     DM_TEXT,
-                                     DM_LONG,
-                                     DM_LONGVARCHAR)
-VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-"""
-  }
-}
-

Reply via email to