This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f8e61ba  [Improve] fix connection close and add dorise2ecase (#329)
f8e61ba is described below

commit f8e61bab004210a3bb33ff173f06205491788c41
Author: wudi <[email protected]>
AuthorDate: Wed Mar 6 15:11:05 2024 +0800

    [Improve] fix connection close and add dorise2ecase (#329)
    
     add dorise2ecase and improvement test case
---
 .../{build-extension.yml => build-connector.yml}   |   4 +-
 .github/workflows/run-e2ecase-12.yml               |  44 ---------
 .../{run-e2ecase-20.yml => run-e2ecase.yml}        |   4 +-
 .github/workflows/run-itcase-12.yml                |  44 ---------
 .../{run-itcase-20.yml => run-itcase.yml}          |   4 +-
 .../java/org/apache/doris/flink/DorisTestBase.java |  61 ++++++++----
 .../apache/doris/flink/sink/DorisSinkITCase.java   |  31 ++++--
 .../doris/flink/source/DorisSourceITCase.java      |   9 +-
 .../cdc/DorisDorisE2ECase.java}                    | 108 ++++++++++-----------
 .../doris/flink/tools/cdc/MySQLDorisE2ECase.java   |  14 ++-
 10 files changed, 142 insertions(+), 181 deletions(-)

diff --git a/.github/workflows/build-extension.yml 
b/.github/workflows/build-connector.yml
similarity index 97%
rename from .github/workflows/build-extension.yml
rename to .github/workflows/build-connector.yml
index 7259bb4..298894f 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-connector.yml
@@ -16,14 +16,14 @@
 # under the License.
 #
 ---
-name: Build Extensions
+name: Build Connector
 on:
   pull_request:
   push:
 
 jobs:
   build-extension:
-    name: "Build Extensions"
+    name: "Build Connector"
     runs-on: ubuntu-latest
     defaults:
       run:
diff --git a/.github/workflows/run-e2ecase-12.yml 
b/.github/workflows/run-e2ecase-12.yml
deleted file mode 100644
index fd89d20..0000000
--- a/.github/workflows/run-e2ecase-12.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
----
-name: Run E2ECases 1.2
-on:
-  pull_request:
-  push:
-
-jobs:
-  build-extension:
-    name: "Run E2ECases 1.2"
-    runs-on: ubuntu-latest
-    defaults:
-      run:
-        shell: bash
-    steps:
-    - name: Checkout
-      uses: actions/checkout@master
-
-    - name: Setup java
-      uses: actions/setup-java@v2
-      with:
-        distribution: adopt
-        java-version: '8'
-
-    - name: Run E2ECases
-      run: |
-        cd flink-doris-connector && mvn test -Dtest="*E2ECase" 
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
diff --git a/.github/workflows/run-e2ecase-20.yml 
b/.github/workflows/run-e2ecase.yml
similarity index 95%
rename from .github/workflows/run-e2ecase-20.yml
rename to .github/workflows/run-e2ecase.yml
index ebf7ae6..77492ae 100644
--- a/.github/workflows/run-e2ecase-20.yml
+++ b/.github/workflows/run-e2ecase.yml
@@ -16,14 +16,14 @@
 # under the License.
 #
 ---
-name: Run E2ECases 2.0
+name: Run E2ECases
 on:
   pull_request:
   push:
 
 jobs:
   build-extension:
-    name: "Run E2ECases 2.0"
+    name: "Run E2ECases"
     runs-on: ubuntu-latest
     defaults:
       run:
diff --git a/.github/workflows/run-itcase-12.yml 
b/.github/workflows/run-itcase-12.yml
deleted file mode 100644
index cd31c3a..0000000
--- a/.github/workflows/run-itcase-12.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
----
-name: Run ITCases 1.2
-on:
-  pull_request:
-  push:
-
-jobs:
-  build-extension:
-    name: "Run ITCases 1.2"
-    runs-on: ubuntu-latest
-    defaults:
-      run:
-        shell: bash
-    steps:
-    - name: Checkout
-      uses: actions/checkout@master
-
-    - name: Setup java
-      uses: actions/setup-java@v2
-      with:
-        distribution: adopt
-        java-version: '8'
-
-    - name: Run ITCases
-      run: |
-        cd flink-doris-connector && mvn test -Dtest="*ITCase" 
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
diff --git a/.github/workflows/run-itcase-20.yml 
b/.github/workflows/run-itcase.yml
similarity index 96%
rename from .github/workflows/run-itcase-20.yml
rename to .github/workflows/run-itcase.yml
index ad9ef5a..d03ee64 100644
--- a/.github/workflows/run-itcase-20.yml
+++ b/.github/workflows/run-itcase.yml
@@ -16,14 +16,14 @@
 # under the License.
 #
 ---
-name: Run ITCases 2.0
+name: Run ITCases
 on:
   pull_request:
   push:
 
 jobs:
   build-extension:
-    name: "Run ITCases 2.0"
+    name: "Run ITCases"
     runs-on: ubuntu-latest
     defaults:
       run:
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index 278be8c..09ae4bd 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -28,6 +28,9 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerLoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -51,7 +54,11 @@ import static org.awaitility.Durations.ONE_SECOND;
 
 public abstract class DorisTestBase {
     protected static final Logger LOG = 
LoggerFactory.getLogger(DorisTestBase.class);
-    protected static final String DORIS_DOCKER_IMAGE = 
System.getProperty("image");
+    private static final String DEFAULT_DOCKER_IMAGE = 
"adamlee489/doris:2.0.3";
+    protected static final String DORIS_DOCKER_IMAGE =
+            System.getProperty("image") == null
+                    ? DEFAULT_DOCKER_IMAGE
+                    : System.getProperty("image");
     private static final String DRIVER_JAR =
             
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
     protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
@@ -59,7 +66,6 @@ public abstract class DorisTestBase {
     protected static final String USERNAME = "root";
     protected static final String PASSWORD = "";
     protected static final GenericContainer DORIS_CONTAINER = 
createDorisContainer();
-    protected static Connection connection;
     protected static final int DEFAULT_PARALLELISM = 4;
 
     protected static String getFenodes() {
@@ -68,21 +74,21 @@ public abstract class DorisTestBase {
 
     @BeforeClass
     public static void startContainers() {
-        LOG.info("Starting containers...");
+        LOG.info("Starting doris containers...");
         Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
         given().ignoreExceptions()
                 .await()
                 .atMost(300, TimeUnit.SECONDS)
                 .pollInterval(ONE_SECOND)
                 .untilAsserted(DorisTestBase::initializeJdbcConnection);
-        LOG.info("Containers are started.");
+        LOG.info("Containers doris are started.");
     }
 
     @AfterClass
     public static void stopContainers() {
-        LOG.info("Stopping containers...");
+        LOG.info("Stopping doris containers...");
         DORIS_CONTAINER.stop();
-        LOG.info("Containers are stopped.");
+        LOG.info("Containers doris are stopped.");
     }
 
     public static GenericContainer createDorisContainer() {
@@ -90,17 +96,11 @@ public abstract class DorisTestBase {
                 new GenericContainer<>(DORIS_DOCKER_IMAGE)
                         .withNetwork(Network.newNetwork())
                         .withNetworkAliases("DorisContainer")
-                        .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
-                        .withEnv("FE_ID", "1")
-                        .withEnv("CURRENT_BE_IP", "127.0.0.1")
-                        .withEnv("CURRENT_BE_PORT", "9050")
-                        .withCommand("ulimit -n 65536")
-                        .withCreateContainerCmdModifier(
-                                cmd -> cmd.getHostConfig().withMemorySwap(0L))
                         .withPrivilegedMode(true)
                         .withLogConsumer(
                                 new Slf4jLogConsumer(
-                                        
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
+                                        
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
+                        .withReuse(true);
 
         container.setPortBindings(
                 Lists.newArrayList(
@@ -118,10 +118,10 @@ public abstract class DorisTestBase {
                         new URL[] {new URL(DRIVER_JAR)}, 
DorisTestBase.class.getClassLoader());
         LOG.info("Try to connect to Doris...");
         Thread.currentThread().setContextClassLoader(urlClassLoader);
-        connection =
-                DriverManager.getConnection(
-                        String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
-        try (Statement statement = connection.createStatement()) {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             ResultSet resultSet;
             do {
                 LOG.info("Wait for the Backend to start successfully...");
@@ -144,7 +144,12 @@ public abstract class DorisTestBase {
 
     protected static void printClusterStatus() throws Exception {
         LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
-        try (Statement statement = connection.createStatement()) {
+        echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
+        echo("sh", "-c", "free -h");
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             ResultSet showFrontends = statement.executeQuery("show frontends");
             LOG.info("Frontends status: {}", convertList(showFrontends));
             ResultSet showBackends = statement.executeQuery("show backends");
@@ -152,6 +157,24 @@ public abstract class DorisTestBase {
         }
     }
 
+    static void echo(String... cmd) {
+        try {
+            Process p = Runtime.getRuntime().exec(cmd);
+            InputStream is = p.getInputStream();
+            BufferedReader reader = new BufferedReader(new 
InputStreamReader(is));
+            String line;
+            while ((line = reader.readLine()) != null) {
+                System.out.println(line);
+            }
+            p.waitFor();
+            is.close();
+            reader.close();
+            p.destroy();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
     private static List<Map> convertList(ResultSet rs) throws SQLException {
         List<Map> list = new ArrayList<>();
         ResultSetMetaData metaData = rs.getMetaData();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 26cbc2c..c9501d3 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -30,6 +30,8 @@ import 
org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Arrays;
@@ -45,7 +47,7 @@ import java.util.stream.Stream;
 
 /** DorisSink ITCase with csv and arrow format. */
 public class DorisSinkITCase extends DorisTestBase {
-    static final String DATABASE = "test";
+    static final String DATABASE = "test_sink";
     static final String TABLE_CSV = "tbl_csv";
     static final String TABLE_JSON = "tbl_json";
     static final String TABLE_JSON_TBL = "tbl_json_tbl";
@@ -61,9 +63,13 @@ public class DorisSinkITCase extends DorisTestBase {
 
         Thread.sleep(10000);
         Set<List<Object>> actual = new HashSet<>();
-        try (Statement sinkStatement = connection.createStatement()) {
+
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             ResultSet sinkResultSet =
-                    sinkStatement.executeQuery(
+                    statement.executeQuery(
                             String.format(
                                     "select name,age from %s.%s order by 1", 
DATABASE, TABLE_CSV));
             while (sinkResultSet.next()) {
@@ -102,9 +108,12 @@ public class DorisSinkITCase extends DorisTestBase {
 
         Thread.sleep(10000);
         Set<List<Object>> actual = new HashSet<>();
-        try (Statement sinkStatement = connection.createStatement()) {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             ResultSet sinkResultSet =
-                    sinkStatement.executeQuery(
+                    statement.executeQuery(
                             String.format(
                                     "select name,age from %s.%s order by 1", 
DATABASE, TABLE_JSON));
             while (sinkResultSet.next()) {
@@ -172,9 +181,12 @@ public class DorisSinkITCase extends DorisTestBase {
 
         Thread.sleep(10000);
         Set<List<Object>> actual = new HashSet<>();
-        try (Statement sinkStatement = connection.createStatement()) {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             ResultSet sinkResultSet =
-                    sinkStatement.executeQuery(
+                    statement.executeQuery(
                             String.format(
                                     "select name,age from %s.%s order by 1",
                                     DATABASE, TABLE_JSON_TBL));
@@ -191,7 +203,10 @@ public class DorisSinkITCase extends DorisTestBase {
     }
 
     private void initializeTable(String table) throws Exception {
-        try (Statement statement = connection.createStatement()) {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             statement.execute(String.format("CREATE DATABASE IF NOT EXISTS 
%s", DATABASE));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, table));
             statement.execute(
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index a5a3b53..f88b756 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -32,6 +32,8 @@ import 
org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
 import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,7 +41,7 @@ import java.util.List;
 
 /** DorisSource ITCase. */
 public class DorisSourceITCase extends DorisTestBase {
-    static final String DATABASE = "test";
+    static final String DATABASE = "test_source";
     static final String TABLE_READ = "tbl_read";
     static final String TABLE_READ_TBL = "tbl_read_tbl";
 
@@ -111,7 +113,10 @@ public class DorisSourceITCase extends DorisTestBase {
     }
 
     private void initializeTable(String table) throws Exception {
-        try (Statement statement = connection.createStatement()) {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             statement.execute(String.format("CREATE DATABASE IF NOT EXISTS 
%s", DATABASE));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, table));
             statement.execute(
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
similarity index 54%
copy from 
flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
copy to 
flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
index a5a3b53..ad40255 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.source;
+package org.apache.doris.flink.tools.cdc;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -26,59 +25,26 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
 import org.apache.doris.flink.DorisTestBase;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
 import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-/** DorisSource ITCase. */
-public class DorisSourceITCase extends DorisTestBase {
-    static final String DATABASE = "test";
-    static final String TABLE_READ = "tbl_read";
-    static final String TABLE_READ_TBL = "tbl_read_tbl";
+/** DorisDorisE2ECase. */
+public class DorisDorisE2ECase extends DorisTestBase {
+    private static final String DATABASE_SOURCE = "test_e2e_source";
+    private static final String DATABASE_SINK = "test_e2e_sink";
+    private static final String TABLE = "test_tbl";
 
     @Test
-    public void testSource() throws Exception {
-        initializeTable(TABLE_READ);
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
-
-        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
-        dorisBuilder
-                .setFenodes(getFenodes())
-                .setTableIdentifier(DATABASE + "." + TABLE_READ)
-                .setUsername(USERNAME)
-                .setPassword(PASSWORD);
-
-        DorisSource<List<?>> source =
-                DorisSource.<List<?>>builder()
-                        .setDorisReadOptions(readOptionBuilder.build())
-                        .setDorisOptions(dorisBuilder.build())
-                        .setDeserializer(new SimpleListDeserializationSchema())
-                        .build();
-        List<Object> actual = new ArrayList<>();
-        try (CloseableIterator<List<?>> iterator =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Doris Source")
-                        .executeAndCollect()) {
-            while (iterator.hasNext()) {
-                actual.add(iterator.next());
-            }
-        }
-        List<Object> expected =
-                Arrays.asList(Arrays.asList("doris", 18), 
Arrays.asList("flink", 10));
-        Assertions.assertIterableEquals(expected, actual);
-    }
-
-    @Test
-    public void testTableSource() throws Exception {
-        initializeTable(TABLE_READ_TBL);
+    public void testDoris2Doris() throws Exception {
+        initializeDorisTable(TABLE);
+        printClusterStatus();
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -96,10 +62,27 @@ public class DorisSourceITCase extends DorisTestBase {
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s'"
                                 + ")",
-                        getFenodes(), DATABASE + "." + TABLE_READ_TBL, 
USERNAME, PASSWORD);
+                        getFenodes(), DATABASE_SOURCE + "." + TABLE, USERNAME, 
PASSWORD);
         tEnv.executeSql(sourceDDL);
-        TableResult tableResult = tEnv.executeSql("SELECT * FROM 
doris_source");
 
+        String sinkDDL =
+                String.format(
+                        "CREATE TABLE doris_sink ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(), DATABASE_SINK + "." + TABLE, USERNAME, 
PASSWORD);
+        tEnv.executeSql(sinkDDL);
+
+        tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM 
doris_source").await();
+
+        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
         List<Object> actual = new ArrayList<>();
         try (CloseableIterator<Row> iterator = tableResult.collect()) {
             while (iterator.hasNext()) {
@@ -110,10 +93,25 @@ public class DorisSourceITCase extends DorisTestBase {
         Assertions.assertIterableEquals(Arrays.asList(expected), actual);
     }
 
-    private void initializeTable(String table) throws Exception {
-        try (Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS 
%s", DATABASE));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, table));
+    private void initializeDorisTable(String table) throws Exception {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS 
%s", DATABASE_SOURCE));
+            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS 
%s", DATABASE_SINK));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE_SOURCE, table));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE_SINK, table));
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256),\n"
+                                    + "`age` int\n"
+                                    + ") DISTRIBUTED BY HASH(`name`) BUCKETS 
1\n"
+                                    + "PROPERTIES (\n"
+                                    + "\"replication_num\" = \"1\"\n"
+                                    + ")\n",
+                            DATABASE_SOURCE, table));
             statement.execute(
                     String.format(
                             "CREATE TABLE %s.%s ( \n"
@@ -123,11 +121,13 @@ public class DorisSourceITCase extends DorisTestBase {
                                     + "PROPERTIES (\n"
                                     + "\"replication_num\" = \"1\"\n"
                                     + ")\n",
-                            DATABASE, table));
+                            DATABASE_SINK, table));
             statement.execute(
-                    String.format("insert into %s.%s  values ('doris',18)", 
DATABASE, table));
+                    String.format(
+                            "insert into %s.%s  values ('doris',18)", 
DATABASE_SOURCE, table));
             statement.execute(
-                    String.format("insert into %s.%s  values ('flink',10)", 
DATABASE, table));
+                    String.format(
+                            "insert into %s.%s  values ('flink',10)", 
DATABASE_SOURCE, table));
         }
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 3390f75..99e7a13 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -63,7 +63,7 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
  */
 public class MySQLDorisE2ECase extends DorisTestBase {
     protected static final Logger LOG = 
LoggerFactory.getLogger(MySQLDorisE2ECase.class);
-    private static final String DATABASE = "test";
+    private static final String DATABASE = "test_e2e_mysql";
     private static final String MYSQL_USER = "root";
     private static final String MYSQL_PASSWD = "123456";
     private static final String TABLE_1 = "tbl1";
@@ -276,7 +276,10 @@ public class MySQLDorisE2ECase extends DorisTestBase {
     }
 
     private void initializeDorisTable() throws Exception {
-        try (Statement statement = connection.createStatement()) {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_1));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_2));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_3));
@@ -287,8 +290,11 @@ public class MySQLDorisE2ECase extends DorisTestBase {
     public void checkResult(Set<List<Object>> expected, String query, int 
columnSize)
             throws Exception {
         Set<List<Object>> actual = new HashSet<>();
-        try (Statement sinkStatement = connection.createStatement()) {
-            ResultSet sinkResultSet = sinkStatement.executeQuery(query);
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+                Statement statement = connection.createStatement()) {
+            ResultSet sinkResultSet = statement.executeQuery(query);
             while (sinkResultSet.next()) {
                 List<Object> row = new ArrayList<>();
                 for (int i = 1; i <= columnSize; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to