This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new f8e61ba [Improve] fix connection close and add dorise2ecase (#329)
f8e61ba is described below
commit f8e61bab004210a3bb33ff173f06205491788c41
Author: wudi <[email protected]>
AuthorDate: Wed Mar 6 15:11:05 2024 +0800
[Improve] fix connection close and add dorise2ecase (#329)
add dorise2ecase and improvement test case
---
.../{build-extension.yml => build-connector.yml} | 4 +-
.github/workflows/run-e2ecase-12.yml | 44 ---------
.../{run-e2ecase-20.yml => run-e2ecase.yml} | 4 +-
.github/workflows/run-itcase-12.yml | 44 ---------
.../{run-itcase-20.yml => run-itcase.yml} | 4 +-
.../java/org/apache/doris/flink/DorisTestBase.java | 61 ++++++++----
.../apache/doris/flink/sink/DorisSinkITCase.java | 31 ++++--
.../doris/flink/source/DorisSourceITCase.java | 9 +-
.../cdc/DorisDorisE2ECase.java} | 108 ++++++++++-----------
.../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 14 ++-
10 files changed, 142 insertions(+), 181 deletions(-)
diff --git a/.github/workflows/build-extension.yml
b/.github/workflows/build-connector.yml
similarity index 97%
rename from .github/workflows/build-extension.yml
rename to .github/workflows/build-connector.yml
index 7259bb4..298894f 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-connector.yml
@@ -16,14 +16,14 @@
# under the License.
#
---
-name: Build Extensions
+name: Build Connector
on:
pull_request:
push:
jobs:
build-extension:
- name: "Build Extensions"
+ name: "Build Connector"
runs-on: ubuntu-latest
defaults:
run:
diff --git a/.github/workflows/run-e2ecase-12.yml
b/.github/workflows/run-e2ecase-12.yml
deleted file mode 100644
index fd89d20..0000000
--- a/.github/workflows/run-e2ecase-12.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
----
-name: Run E2ECases 1.2
-on:
- pull_request:
- push:
-
-jobs:
- build-extension:
- name: "Run E2ECases 1.2"
- runs-on: ubuntu-latest
- defaults:
- run:
- shell: bash
- steps:
- - name: Checkout
- uses: actions/checkout@master
-
- - name: Setup java
- uses: actions/setup-java@v2
- with:
- distribution: adopt
- java-version: '8'
-
- - name: Run E2ECases
- run: |
- cd flink-doris-connector && mvn test -Dtest="*E2ECase"
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
diff --git a/.github/workflows/run-e2ecase-20.yml
b/.github/workflows/run-e2ecase.yml
similarity index 95%
rename from .github/workflows/run-e2ecase-20.yml
rename to .github/workflows/run-e2ecase.yml
index ebf7ae6..77492ae 100644
--- a/.github/workflows/run-e2ecase-20.yml
+++ b/.github/workflows/run-e2ecase.yml
@@ -16,14 +16,14 @@
# under the License.
#
---
-name: Run E2ECases 2.0
+name: Run E2ECases
on:
pull_request:
push:
jobs:
build-extension:
- name: "Run E2ECases 2.0"
+ name: "Run E2ECases"
runs-on: ubuntu-latest
defaults:
run:
diff --git a/.github/workflows/run-itcase-12.yml
b/.github/workflows/run-itcase-12.yml
deleted file mode 100644
index cd31c3a..0000000
--- a/.github/workflows/run-itcase-12.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
----
-name: Run ITCases 1.2
-on:
- pull_request:
- push:
-
-jobs:
- build-extension:
- name: "Run ITCases 1.2"
- runs-on: ubuntu-latest
- defaults:
- run:
- shell: bash
- steps:
- - name: Checkout
- uses: actions/checkout@master
-
- - name: Setup java
- uses: actions/setup-java@v2
- with:
- distribution: adopt
- java-version: '8'
-
- - name: Run ITCases
- run: |
- cd flink-doris-connector && mvn test -Dtest="*ITCase"
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
diff --git a/.github/workflows/run-itcase-20.yml
b/.github/workflows/run-itcase.yml
similarity index 96%
rename from .github/workflows/run-itcase-20.yml
rename to .github/workflows/run-itcase.yml
index ad9ef5a..d03ee64 100644
--- a/.github/workflows/run-itcase-20.yml
+++ b/.github/workflows/run-itcase.yml
@@ -16,14 +16,14 @@
# under the License.
#
---
-name: Run ITCases 2.0
+name: Run ITCases
on:
pull_request:
push:
jobs:
build-extension:
- name: "Run ITCases 2.0"
+ name: "Run ITCases"
runs-on: ubuntu-latest
defaults:
run:
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index 278be8c..09ae4bd 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -28,6 +28,9 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLClassLoader;
@@ -51,7 +54,11 @@ import static org.awaitility.Durations.ONE_SECOND;
public abstract class DorisTestBase {
protected static final Logger LOG =
LoggerFactory.getLogger(DorisTestBase.class);
- protected static final String DORIS_DOCKER_IMAGE =
System.getProperty("image");
+ private static final String DEFAULT_DOCKER_IMAGE =
"adamlee489/doris:2.0.3";
+ protected static final String DORIS_DOCKER_IMAGE =
+ System.getProperty("image") == null
+ ? DEFAULT_DOCKER_IMAGE
+ : System.getProperty("image");
private static final String DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
@@ -59,7 +66,6 @@ public abstract class DorisTestBase {
protected static final String USERNAME = "root";
protected static final String PASSWORD = "";
protected static final GenericContainer DORIS_CONTAINER =
createDorisContainer();
- protected static Connection connection;
protected static final int DEFAULT_PARALLELISM = 4;
protected static String getFenodes() {
@@ -68,21 +74,21 @@ public abstract class DorisTestBase {
@BeforeClass
public static void startContainers() {
- LOG.info("Starting containers...");
+ LOG.info("Starting doris containers...");
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
given().ignoreExceptions()
.await()
.atMost(300, TimeUnit.SECONDS)
.pollInterval(ONE_SECOND)
.untilAsserted(DorisTestBase::initializeJdbcConnection);
- LOG.info("Containers are started.");
+ LOG.info("Containers doris are started.");
}
@AfterClass
public static void stopContainers() {
- LOG.info("Stopping containers...");
+ LOG.info("Stopping doris containers...");
DORIS_CONTAINER.stop();
- LOG.info("Containers are stopped.");
+ LOG.info("Containers doris are stopped.");
}
public static GenericContainer createDorisContainer() {
@@ -90,17 +96,11 @@ public abstract class DorisTestBase {
new GenericContainer<>(DORIS_DOCKER_IMAGE)
.withNetwork(Network.newNetwork())
.withNetworkAliases("DorisContainer")
- .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
- .withEnv("FE_ID", "1")
- .withEnv("CURRENT_BE_IP", "127.0.0.1")
- .withEnv("CURRENT_BE_PORT", "9050")
- .withCommand("ulimit -n 65536")
- .withCreateContainerCmdModifier(
- cmd -> cmd.getHostConfig().withMemorySwap(0L))
.withPrivilegedMode(true)
.withLogConsumer(
new Slf4jLogConsumer(
-
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
+
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
+ .withReuse(true);
container.setPortBindings(
Lists.newArrayList(
@@ -118,10 +118,10 @@ public abstract class DorisTestBase {
new URL[] {new URL(DRIVER_JAR)},
DorisTestBase.class.getClassLoader());
LOG.info("Try to connect to Doris...");
Thread.currentThread().setContextClassLoader(urlClassLoader);
- connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
- try (Statement statement = connection.createStatement()) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
ResultSet resultSet;
do {
LOG.info("Wait for the Backend to start successfully...");
@@ -144,7 +144,12 @@ public abstract class DorisTestBase {
protected static void printClusterStatus() throws Exception {
LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
- try (Statement statement = connection.createStatement()) {
+ echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
+ echo("sh", "-c", "free -h");
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
ResultSet showFrontends = statement.executeQuery("show frontends");
LOG.info("Frontends status: {}", convertList(showFrontends));
ResultSet showBackends = statement.executeQuery("show backends");
@@ -152,6 +157,24 @@ public abstract class DorisTestBase {
}
}
+ static void echo(String... cmd) {
+ try {
+ Process p = Runtime.getRuntime().exec(cmd);
+ InputStream is = p.getInputStream();
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(is));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ p.waitFor();
+ is.close();
+ reader.close();
+ p.destroy();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
private static List<Map> convertList(ResultSet rs) throws SQLException {
List<Map> list = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 26cbc2c..c9501d3 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -30,6 +30,8 @@ import
org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
+import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
@@ -45,7 +47,7 @@ import java.util.stream.Stream;
/** DorisSink ITCase with csv and arrow format. */
public class DorisSinkITCase extends DorisTestBase {
- static final String DATABASE = "test";
+ static final String DATABASE = "test_sink";
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
static final String TABLE_JSON_TBL = "tbl_json_tbl";
@@ -61,9 +63,13 @@ public class DorisSinkITCase extends DorisTestBase {
Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement = connection.createStatement()) {
+
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
- sinkStatement.executeQuery(
+ statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1",
DATABASE, TABLE_CSV));
while (sinkResultSet.next()) {
@@ -102,9 +108,12 @@ public class DorisSinkITCase extends DorisTestBase {
Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement = connection.createStatement()) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
- sinkStatement.executeQuery(
+ statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON));
while (sinkResultSet.next()) {
@@ -172,9 +181,12 @@ public class DorisSinkITCase extends DorisTestBase {
Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement = connection.createStatement()) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
- sinkStatement.executeQuery(
+ statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON_TBL));
@@ -191,7 +203,10 @@ public class DorisSinkITCase extends DorisTestBase {
}
private void initializeTable(String table) throws Exception {
- try (Statement statement = connection.createStatement()) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS
%s", DATABASE));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, table));
statement.execute(
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index a5a3b53..f88b756 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -32,6 +32,8 @@ import
org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
+import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,7 +41,7 @@ import java.util.List;
/** DorisSource ITCase. */
public class DorisSourceITCase extends DorisTestBase {
- static final String DATABASE = "test";
+ static final String DATABASE = "test_source";
static final String TABLE_READ = "tbl_read";
static final String TABLE_READ_TBL = "tbl_read_tbl";
@@ -111,7 +113,10 @@ public class DorisSourceITCase extends DorisTestBase {
}
private void initializeTable(String table) throws Exception {
- try (Statement statement = connection.createStatement()) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS
%s", DATABASE));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, table));
statement.execute(
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
similarity index 54%
copy from
flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
copy to
flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
index a5a3b53..ad40255 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.source;
+package org.apache.doris.flink.tools.cdc;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -26,59 +25,26 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.doris.flink.DorisTestBase;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
+import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-/** DorisSource ITCase. */
-public class DorisSourceITCase extends DorisTestBase {
- static final String DATABASE = "test";
- static final String TABLE_READ = "tbl_read";
- static final String TABLE_READ_TBL = "tbl_read_tbl";
+/** DorisDorisE2ECase. */
+public class DorisDorisE2ECase extends DorisTestBase {
+ private static final String DATABASE_SOURCE = "test_e2e_source";
+ private static final String DATABASE_SINK = "test_e2e_sink";
+ private static final String TABLE = "test_tbl";
@Test
- public void testSource() throws Exception {
- initializeTable(TABLE_READ);
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
-
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes(getFenodes())
- .setTableIdentifier(DATABASE + "." + TABLE_READ)
- .setUsername(USERNAME)
- .setPassword(PASSWORD);
-
- DorisSource<List<?>> source =
- DorisSource.<List<?>>builder()
- .setDorisReadOptions(readOptionBuilder.build())
- .setDorisOptions(dorisBuilder.build())
- .setDeserializer(new SimpleListDeserializationSchema())
- .build();
- List<Object> actual = new ArrayList<>();
- try (CloseableIterator<List<?>> iterator =
- env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Doris Source")
- .executeAndCollect()) {
- while (iterator.hasNext()) {
- actual.add(iterator.next());
- }
- }
- List<Object> expected =
- Arrays.asList(Arrays.asList("doris", 18),
Arrays.asList("flink", 10));
- Assertions.assertIterableEquals(expected, actual);
- }
-
- @Test
- public void testTableSource() throws Exception {
- initializeTable(TABLE_READ_TBL);
+ public void testDoris2Doris() throws Exception {
+ initializeDorisTable(TABLE);
+ printClusterStatus();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -96,10 +62,27 @@ public class DorisSourceITCase extends DorisTestBase {
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
- getFenodes(), DATABASE + "." + TABLE_READ_TBL,
USERNAME, PASSWORD);
+ getFenodes(), DATABASE_SOURCE + "." + TABLE, USERNAME,
PASSWORD);
tEnv.executeSql(sourceDDL);
- TableResult tableResult = tEnv.executeSql("SELECT * FROM
doris_source");
+ String sinkDDL =
+ String.format(
+ "CREATE TABLE doris_sink ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(), DATABASE_SINK + "." + TABLE, USERNAME,
PASSWORD);
+ tEnv.executeSql(sinkDDL);
+
+ tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM
doris_source").await();
+
+ TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
List<Object> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
@@ -110,10 +93,25 @@ public class DorisSourceITCase extends DorisTestBase {
Assertions.assertIterableEquals(Arrays.asList(expected), actual);
}
- private void initializeTable(String table) throws Exception {
- try (Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS
%s", DATABASE));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, table));
+ private void initializeDorisTable(String table) throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(String.format("CREATE DATABASE IF NOT EXISTS
%s", DATABASE_SOURCE));
+ statement.execute(String.format("CREATE DATABASE IF NOT EXISTS
%s", DATABASE_SINK));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE_SOURCE, table));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE_SINK, table));
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int\n"
+ + ") DISTRIBUTED BY HASH(`name`) BUCKETS
1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE_SOURCE, table));
statement.execute(
String.format(
"CREATE TABLE %s.%s ( \n"
@@ -123,11 +121,13 @@ public class DorisSourceITCase extends DorisTestBase {
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
- DATABASE, table));
+ DATABASE_SINK, table));
statement.execute(
- String.format("insert into %s.%s values ('doris',18)",
DATABASE, table));
+ String.format(
+ "insert into %s.%s values ('doris',18)",
DATABASE_SOURCE, table));
statement.execute(
- String.format("insert into %s.%s values ('flink',10)",
DATABASE, table));
+ String.format(
+ "insert into %s.%s values ('flink',10)",
DATABASE_SOURCE, table));
}
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 3390f75..99e7a13 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -63,7 +63,7 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
*/
public class MySQLDorisE2ECase extends DorisTestBase {
protected static final Logger LOG =
LoggerFactory.getLogger(MySQLDorisE2ECase.class);
- private static final String DATABASE = "test";
+ private static final String DATABASE = "test_e2e_mysql";
private static final String MYSQL_USER = "root";
private static final String MYSQL_PASSWD = "123456";
private static final String TABLE_1 = "tbl1";
@@ -276,7 +276,10 @@ public class MySQLDorisE2ECase extends DorisTestBase {
}
private void initializeDorisTable() throws Exception {
- try (Statement statement = connection.createStatement()) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, TABLE_1));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, TABLE_2));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, TABLE_3));
@@ -287,8 +290,11 @@ public class MySQLDorisE2ECase extends DorisTestBase {
public void checkResult(Set<List<Object>> expected, String query, int
columnSize)
throws Exception {
Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement = connection.createStatement()) {
- ResultSet sinkResultSet = sinkStatement.executeQuery(query);
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
+ Statement statement = connection.createStatement()) {
+ ResultSet sinkResultSet = statement.executeQuery(query);
while (sinkResultSet.next()) {
List<Object> row = new ArrayList<>();
for (int i = 1; i <= columnSize; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]