This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new cbb33bb87 [FLINK-35638][OceanBase][test] Refactor OceanBase test cases 
and remove dependency on host network (#3439)
cbb33bb87 is described below

commit cbb33bb8708d11b9b7b8d934aaa6009c4cffbfc2
Author: He Wang <[email protected]>
AuthorDate: Sun Aug 18 13:22:02 2024 +0800

    [FLINK-35638][OceanBase][test] Refactor OceanBase test cases and remove 
dependency on host network (#3439)
---
 .../flink-sources/tutorials/oceanbase-tutorial.md  |  55 ++++----
 .../flink-sources/tutorials/oceanbase-tutorial.md  |  46 +++---
 .../flink-connector-oceanbase-cdc/pom.xml          |  17 +++
 .../connectors/oceanbase/OceanBaseTestBase.java    |  72 +++-------
 .../connectors/oceanbase/OceanBaseTestUtils.java   |  67 +++++++++
 .../oceanbase/table/OceanBaseMySQLModeITCase.java  | 112 +++------------
 .../oceanbase/table/OceanBaseOracleModeITCase.java |  56 ++------
 .../oceanbase/testutils/LogProxyContainer.java     |  62 ++++++++
 .../oceanbase/testutils/OceanBaseCdcMetadata.java  |  54 +++++++
 .../oceanbase/testutils/OceanBaseContainer.java    | 141 +++++++++++++++++++
 .../testutils/OceanBaseMySQLCdcMetadata.java       | 113 +++++++++++++++
 .../testutils/OceanBaseOracleCdcMetadata.java      |  82 +++++++++++
 .../oceanbase/testutils/UniqueDatabase.java        | 156 +++++++++++++++++++++
 .../src/test/resources/ddl/mysql/docker_init.sql   |  17 ---
 .../tests/utils/PipelineTestEnvironment.java       |   2 -
 .../flink-cdc-source-e2e-tests/pom.xml             |   7 +
 .../cdc/connectors/tests/OceanBaseE2eITCase.java   | 141 ++++++-------------
 .../tests/utils/FlinkContainerTestEnvironment.java |   2 -
 .../src/test/resources/ddl/oceanbase_inventory.sql |   7 +-
 .../src/test/resources/docker/oceanbase/setup.sql  |  16 ---
 20 files changed, 841 insertions(+), 384 deletions(-)

diff --git 
a/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md 
b/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
index 609e79700..62798a821 100644
--- 
a/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
+++ 
b/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
@@ -42,16 +42,23 @@ under the License.
 version: '2.1'
 services:
   observer:
-    image: oceanbase/oceanbase-ce:4.0.0.0
+    image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
     container_name: observer
-    network_mode: "host"
+    environment:
+      - 'MODE=mini'
+      - 'OB_SYS_PASSWORD=123456'
+      - 'OB_TENANT_PASSWORD=654321'
+    ports:
+      - '2881:2881'
+      - '2882:2882'
   oblogproxy:
-    image: whhe/oblogproxy:1.1.0_4x
+    image: 'oceanbase/oblogproxy-ce:latest'
     container_name: oblogproxy
     environment:
       - 'OB_SYS_USERNAME=root'
-      - 'OB_SYS_PASSWORD=pswd'
-    network_mode: "host"
+      - 'OB_SYS_PASSWORD=123456'
+    ports:
+      - '2983:2983'
   elasticsearch:
     image: 'elastic/elasticsearch:7.6.0'
     container_name: elasticsearch
@@ -85,42 +92,26 @@ services:
 docker-compose up -d
 ```
 
-### 设置密码
-
-OceanBase 中 root 用户默认是没有密码的,但是 oblogproxy 需要配置一个使用非空密码的系统租户用户,因此这里我们需要先为 
root@sys 用户设置一个密码。
+### 查询 Root Service List
 
 登陆 sys 租户的 root 用户:
 
 ```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys
-```
-
-设置密码,注意这里的密码需要与上一步中 oblogproxy 服务的环境变量 'OB_SYS_PASSWORD' 保持一样。
-
-```mysql
-ALTER USER root IDENTIFIED BY 'pswd';
-```
-
-OceanBase 从社区版 4.0.0.0 开始只支持对非 sys 租户的增量数据拉取,这里我们使用 test 租户的 root 用户作为示例。
-
-登陆 test 租户的 root 用户:
-
-```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
 ```
 
-设置密码:
+执行以下 sql 以查询 root service list,将 VALUE 列的值保存下来。
 
 ```mysql
-ALTER USER root IDENTIFIED BY 'test';
+SHOW PARAMETERS LIKE 'rootservice_list';
 ```
 
 ### 准备数据
 
-使用 'root@test' 用户登陆。
+使用测试用的 test 租户的 root 用户登陆。
 
 ```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
 ```
 
 ```sql
@@ -169,6 +160,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, 
false),
 
 ### 在 Flink SQL CLI 中使用 Flink DDL 创建表
 
+注意在 OceanBase 源表的 SQL 中替换 root_service_list 为真实值。
+
 ```sql
 -- 设置间隔时间为3秒                       
 Flink SQL> SET execution.checkpointing.interval = 3s;
@@ -189,13 +182,13 @@ Flink SQL> CREATE TABLE orders (
     'connector' = 'oceanbase-cdc',
     'scan.startup.mode' = 'initial',
     'username' = 'root@test',
-    'password' = 'test',
+    'password' = '654321',
     'tenant-name' = 'test',
     'database-name' = '^ob$',
     'table-name' = '^orders$',
     'hostname' = 'localhost',
     'port' = '2881',
-    'rootserver-list' = '127.0.0.1:2882:2881',
+    'rootserver-list' = '${root_service_list}',
     'logproxy.host' = 'localhost',
     'logproxy.port' = '2983',
     'working-mode' = 'memory'
@@ -211,13 +204,13 @@ Flink SQL> CREATE TABLE products (
     'connector' = 'oceanbase-cdc',
     'scan.startup.mode' = 'initial',
     'username' = 'root@test',
-    'password' = 'test',
+    'password' = '654321',
     'tenant-name' = 'test',
     'database-name' = '^ob$',
     'table-name' = '^products$',
     'hostname' = 'localhost',
     'port' = '2881',
-    'rootserver-list' = '127.0.0.1:2882:2881',
+    'rootserver-list' = '${root_service_list}',
     'logproxy.host' = 'localhost',
     'logproxy.port' = '2983',
     'working-mode' = 'memory'
diff --git 
a/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md 
b/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
index a9bdc3b98..744e8b79b 100644
--- a/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
+++ b/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
@@ -37,25 +37,27 @@ under the License.
 
 Create `docker-compose.yml`.
 
-*Note*: `host` network mode is required in this demo, so it can only work on 
Linux, see 
[network-tutorial-host](https://docs.docker.com/network/network-tutorial-host/).
-
 ```yaml
 version: '2.1'
 services:
   observer:
-    image: oceanbase/oceanbase-ce:4.2.0.0
+    image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
     container_name: observer
     environment:
-      - 'MODE=slim'
-      - 'OB_ROOT_PASSWORD=pswd'
-    network_mode: "host"
+      - 'MODE=mini'
+      - 'OB_SYS_PASSWORD=123456'
+      - 'OB_TENANT_PASSWORD=654321'
+    ports:
+      - '2881:2881'
+      - '2882:2882'
   oblogproxy:
-    image: whhe/oblogproxy:1.1.3_4x
+    image: 'oceanbase/oblogproxy-ce:latest'
     container_name: oblogproxy
     environment:
       - 'OB_SYS_USERNAME=root'
-      - 'OB_SYS_PASSWORD=pswd'
-    network_mode: "host"
+      - 'OB_SYS_PASSWORD=123456'
+    ports:
+      - '2983:2983'
   elasticsearch:
     image: 'elastic/elasticsearch:7.6.0'
     container_name: elasticsearch
@@ -89,22 +91,18 @@ Execute the following command in the directory where 
`docker-compose.yml` is loc
 docker-compose up -d
 ```
 
-### Set password
+### Query Root Service List
 
-From OceanBase 4.0.0.0 CE, we can only fetch the commit log of non-sys tenant.
-
-Here we use the 'test' tenant for example.
-
-Login with 'root' user of 'test' tenant:
+Login with 'root' user of 'sys' tenant:
 
 ```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
 ```
 
-Set a password:
+Query the root service list by following SQL and store the value.
 
 ```mysql
-ALTER USER root IDENTIFIED BY 'test';
+SHOW PARAMETERS LIKE 'rootservice_list';
 ```
 
 ### Create data for reading snapshot
@@ -112,7 +110,7 @@ ALTER USER root IDENTIFIED BY 'test';
 Login 'root' user of 'test' tenant.
 
 ```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
 ```
 
 Insert data:
@@ -163,6 +161,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, 
false),
 
 ### Use Flink DDL to create dynamic table in Flink SQL CLI
 
+Note that in the SQL of the OceanBase source table, replace root_service_list 
with the actual value.
+
 ```sql
 -- checkpoint every 3000 milliseconds                     
 Flink SQL> SET execution.checkpointing.interval = 3s;
@@ -183,13 +183,13 @@ Flink SQL> CREATE TABLE orders (
     'connector' = 'oceanbase-cdc',
     'scan.startup.mode' = 'initial',
     'username' = 'root@test',
-    'password' = 'test',
+    'password' = '654321',
     'tenant-name' = 'test',
     'database-name' = '^ob$',
     'table-name' = '^orders$',
     'hostname' = 'localhost',
     'port' = '2881',
-    'rootserver-list' = '127.0.0.1:2882:2881',
+    'rootserver-list' = '${root_service_list}',
     'logproxy.host' = 'localhost',
     'logproxy.port' = '2983',
     'working-mode' = 'memory'
@@ -205,13 +205,13 @@ Flink SQL> CREATE TABLE products (
     'connector' = 'oceanbase-cdc',
     'scan.startup.mode' = 'initial',
     'username' = 'root@test',
-    'password' = 'test',
+    'password' = '654321',
     'tenant-name' = 'test',
     'database-name' = '^ob$',
     'table-name' = '^products$',
     'hostname' = 'localhost',
     'port' = '2881',
-    'rootserver-list' = '127.0.0.1:2882:2881',
+    'rootserver-list' = '${root_service_list}',
     'logproxy.host' = 'localhost',
     'logproxy.port' = '2983',
     'working-mode' = 'memory'
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
index 1ed25b89f..726d7f438 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
@@ -163,4 +163,21 @@ limitations under the License.
 
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>test-jar</id>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
index c3a4c2ba5..413b7494f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
@@ -17,20 +17,18 @@
 
 package org.apache.flink.cdc.connectors.oceanbase;
 
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import 
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.utils.LegacyRowResource;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.ClassRule;
-import org.junit.Rule;
 
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
@@ -43,51 +41,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /** Basic class for testing OceanBase source. */
-public abstract class OceanBaseTestBase extends TestLogger {
+public abstract class OceanBaseTestBase extends AbstractTestBase {
 
     private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)--.*$");
 
-    protected static final int DEFAULT_PARALLELISM = 4;
-
-    @Rule
-    public final MiniClusterWithClientResource miniClusterResource =
-            new MiniClusterWithClientResource(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
     @ClassRule public static LegacyRowResource usesLegacyRows = 
LegacyRowResource.INSTANCE;
 
-    protected final String compatibleMode;
-    protected final String username;
-    protected final String password;
-    protected final String hostname;
-    protected final int port;
-    protected final String logProxyHost;
-    protected final int logProxyPort;
-    protected final String tenant;
-
-    public OceanBaseTestBase(
-            String compatibleMode,
-            String username,
-            String password,
-            String hostname,
-            int port,
-            String logProxyHost,
-            int logProxyPort,
-            String tenant) {
-        this.compatibleMode = compatibleMode;
-        this.username = username;
-        this.password = password;
-        this.hostname = hostname;
-        this.port = port;
-        this.logProxyHost = logProxyHost;
-        this.logProxyPort = logProxyPort;
-        this.tenant = tenant;
-    }
+    protected abstract OceanBaseCdcMetadata metadata();
 
     protected String commonOptionsString() {
         return String.format(
@@ -96,8 +56,14 @@ public abstract class OceanBaseTestBase extends TestLogger {
                         + " 'password' = '%s', "
                         + " 'hostname' = '%s', "
                         + " 'port' = '%s', "
-                        + " 'compatible-mode' = '%s'",
-                username, password, hostname, port, compatibleMode);
+                        + " 'compatible-mode' = '%s', "
+                        + " 'jdbc.driver' = '%s'",
+                metadata().getUsername(),
+                metadata().getPassword(),
+                metadata().getHostname(),
+                metadata().getPort(),
+                metadata().getCompatibleMode(),
+                metadata().getDriverClass());
     }
 
     protected String logProxyOptionsString() {
@@ -106,7 +72,9 @@ public abstract class OceanBaseTestBase extends TestLogger {
                         + " 'tenant-name' = '%s',"
                         + " 'logproxy.host' = '%s',"
                         + " 'logproxy.port' = '%s'",
-                tenant, logProxyHost, logProxyPort);
+                metadata().getTenantName(),
+                metadata().getLogProxyHost(),
+                metadata().getLogProxyPort());
     }
 
     protected String initialOptionsString() {
@@ -120,7 +88,10 @@ public abstract class OceanBaseTestBase extends TestLogger {
         return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString();
     }
 
-    protected abstract Connection getJdbcConnection() throws SQLException;
+    protected Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                metadata().getJdbcUrl(), metadata().getUsername(), 
metadata().getPassword());
+    }
 
     protected void setGlobalTimeZone(String serverTimeZone) throws 
SQLException {
         try (Connection connection = getJdbcConnection();
@@ -130,7 +101,8 @@ public abstract class OceanBaseTestBase extends TestLogger {
     }
 
     protected void initializeTable(String sqlFile) {
-        final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode, 
sqlFile);
+        final String ddlFile =
+                String.format("ddl/%s/%s.sql", metadata().getCompatibleMode(), 
sqlFile);
         final URL ddlTestFile = 
getClass().getClassLoader().getResource(ddlFile);
         assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
         try (Connection connection = getJdbcConnection();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java
new file mode 100644
index 000000000..50885089e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase;
+
+import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.time.Duration;
+
+/** Utils to help test. */
+@SuppressWarnings("resource")
+public class OceanBaseTestUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseTestUtils.class);
+
+    private static final String LATEST_VERSION = "latest";
+    private static final String CDC_TEST_OB_VERSION = 
"4.2.1.6-106000012024042515";
+
+    private static final String SYS_PASSWORD = "123456";
+    private static final String TEST_PASSWORD = "654321";
+
+    public static OceanBaseContainer createOceanBaseContainerForCDC() {
+        return createOceanBaseContainer(CDC_TEST_OB_VERSION, "mini")
+                .withSysPassword(SYS_PASSWORD)
+                .withStartupTimeout(Duration.ofMinutes(4));
+    }
+
+    public static OceanBaseContainer createOceanBaseContainerForJdbc() {
+        return createOceanBaseContainer(LATEST_VERSION, "slim")
+                .withStartupTimeout(Duration.ofMinutes(2));
+    }
+
+    public static OceanBaseContainer createOceanBaseContainer(String version, 
String mode) {
+        return new OceanBaseContainer(version)
+                .withMode(mode)
+                .withTenantPassword(TEST_PASSWORD)
+                .withEnv("OB_DATAFILE_SIZE", "2G")
+                .withEnv("OB_LOG_DISK_SIZE", "4G")
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    public static LogProxyContainer createLogProxyContainer() {
+        return new LogProxyContainer(LATEST_VERSION)
+                .withSysPassword(SYS_PASSWORD)
+                .withStartupTimeout(Duration.ofMinutes(1))
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
index 4388b60af..616ccd5e2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
@@ -18,89 +18,55 @@
 package org.apache.flink.cdc.connectors.oceanbase.table;
 
 import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
+import 
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
+import 
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
+import org.testcontainers.containers.Network;
 
 import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
-import java.time.Duration;
 import java.time.ZoneId;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.stream.Stream;
+
+import static 
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
+import static 
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
 
 /** Integration tests for OceanBase MySQL mode table source. */
-@RunWith(Parameterized.class)
 public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseMySQLModeITCase.class);
-
     private final StreamExecutionEnvironment env =
             StreamExecutionEnvironment.getExecutionEnvironment();
     private final StreamTableEnvironment tEnv =
             StreamTableEnvironment.create(
                     env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
 
-    private static final String NETWORK_MODE = "host";
-    private static final String OB_SYS_PASSWORD = "123456";
+    @ClassRule public static final Network NETWORK = Network.newNetwork();
 
     @ClassRule
-    public static final GenericContainer<?> OB_SERVER =
-            new GenericContainer<>("oceanbase/oceanbase-ce:4.2.0.0")
-                    .withNetworkMode(NETWORK_MODE)
-                    .withEnv("MODE", "slim")
-                    .withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD)
-                    .withEnv("OB_DATAFILE_SIZE", "1G")
-                    .withEnv("OB_LOG_DISK_SIZE", "4G")
-                    .withCopyFileToContainer(
-                            
MountableFile.forClasspathResource("ddl/mysql/docker_init.sql"),
-                            "/root/boot/init.d/init.sql")
-                    .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
-                    .withStartupTimeout(Duration.ofMinutes(4))
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    public static final OceanBaseContainer OB_SERVER =
+            createOceanBaseContainerForCDC().withNetwork(NETWORK);
 
     @ClassRule
-    public static final GenericContainer<?> LOG_PROXY =
-            new GenericContainer<>("whhe/oblogproxy:1.1.3_4x")
-                    .withNetworkMode(NETWORK_MODE)
-                    .withEnv("OB_SYS_PASSWORD", OB_SYS_PASSWORD)
-                    .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
-                    .withStartupTimeout(Duration.ofMinutes(1))
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-    @BeforeClass
-    public static void startContainers() {
-        LOG.info("Starting containers...");
-        Startables.deepStart(Stream.of(OB_SERVER, LOG_PROXY)).join();
-        LOG.info("Containers are started.");
-    }
+    public static final LogProxyContainer LOG_PROXY =
+            createLogProxyContainer().withNetwork(NETWORK);
+
+    private static final OceanBaseCdcMetadata METADATA =
+            new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);
 
-    @AfterClass
-    public static void stopContainers() {
-        LOG.info("Stopping containers...");
-        Stream.of(OB_SERVER, LOG_PROXY).forEach(GenericContainer::stop);
-        LOG.info("Containers are stopped.");
+    @Override
+    protected OceanBaseCdcMetadata metadata() {
+        return METADATA;
     }
 
     @Before
@@ -110,47 +76,11 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
     }
 
-    private final String rsList;
-
-    public OceanBaseMySQLModeITCase(
-            String username,
-            String password,
-            String hostname,
-            int port,
-            String logProxyHost,
-            int logProxyPort,
-            String tenant,
-            String rsList) {
-        super("mysql", username, password, hostname, port, logProxyHost, 
logProxyPort, tenant);
-        this.rsList = rsList;
-    }
-
-    @Parameterized.Parameters
-    public static List<Object[]> parameters() {
-        return Collections.singletonList(
-                new Object[] {
-                    "root@test",
-                    "123456",
-                    "127.0.0.1",
-                    2881,
-                    "127.0.0.1",
-                    2983,
-                    "test",
-                    "127.0.0.1:2882:2881"
-                });
-    }
-
     @Override
     protected String logProxyOptionsString() {
         return super.logProxyOptionsString()
                 + " , "
-                + String.format(" 'rootserver-list' = '%s'", rsList);
-    }
-
-    @Override
-    protected Connection getJdbcConnection() throws SQLException {
-        return DriverManager.getConnection(
-                "jdbc:mysql://" + hostname + ":" + port + "/?useSSL=false", 
username, password);
+                + String.format(" 'rootserver-list' = '%s'", 
METADATA.getRsList());
     }
 
     @Test
@@ -312,6 +242,8 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
 
         waitForSinkSize("sink", snapshotSize + 1);
 
+        String tenant = metadata().getTenantName();
+
         List<String> expected =
                 Arrays.asList(
                         "+I("
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
index b11da43ae..e7345baba 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
@@ -18,6 +18,8 @@
 package org.apache.flink.cdc.connectors.oceanbase.table;
 
 import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
+import 
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
+import 
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseOracleCdcMetadata;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableResult;
@@ -26,20 +28,14 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory;
 
 import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 /** Integration tests for OceanBase Oracle mode table source. */
 @Ignore("Test ignored before oceanbase-xe docker image is available")
-@RunWith(Parameterized.class)
 public class OceanBaseOracleModeITCase extends OceanBaseTestBase {
 
     private final StreamExecutionEnvironment env =
@@ -48,61 +44,25 @@ public class OceanBaseOracleModeITCase extends 
OceanBaseTestBase {
             StreamTableEnvironment.create(
                     env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
 
-    private final String schema;
-    private final String configUrl;
-
-    public OceanBaseOracleModeITCase(
-            String username,
-            String password,
-            String hostname,
-            int port,
-            String logProxyHost,
-            int logProxyPort,
-            String tenant,
-            String schema,
-            String configUrl) {
-        super("oracle", username, password, hostname, port, logProxyHost, 
logProxyPort, tenant);
-        this.schema = schema;
-        this.configUrl = configUrl;
-    }
-
-    @Parameterized.Parameters
-    public static List<Object[]> parameters() {
-        return Collections.singletonList(
-                new Object[] {
-                    "SYS@test",
-                    "123456",
-                    "127.0.0.1",
-                    2881,
-                    "127.0.0.1",
-                    2983,
-                    "test",
-                    "SYS",
-                    
"http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster";
-                });
-    }
+    private static final OceanBaseCdcMetadata METADATA = new 
OceanBaseOracleCdcMetadata();
 
     @Override
-    protected String commonOptionsString() {
-        return super.commonOptionsString() + " , " + " 'jdbc.driver' = 
'com.oceanbase.jdbc.Driver'";
+    protected OceanBaseCdcMetadata metadata() {
+        return METADATA;
     }
 
     @Override
     protected String logProxyOptionsString() {
         return super.logProxyOptionsString()
                 + " , "
-                + String.format(" 'config-url' = '%s'", configUrl);
-    }
-
-    @Override
-    protected Connection getJdbcConnection() throws SQLException {
-        return DriverManager.getConnection(
-                "jdbc:oceanbase://" + hostname + ":" + port + "/" + schema, 
username, password);
+                + String.format(" 'config-url' = '%s'", 
METADATA.getConfigUrl());
     }
 
     @Test
     public void testAllDataTypes() throws Exception {
         initializeTable("column_type_test");
+
+        String schema = metadata().getDatabase();
         String sourceDDL =
                 String.format(
                         "CREATE TABLE full_types ("
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java
new file mode 100644
index 000000000..c33eccbb4
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import org.jetbrains.annotations.NotNull;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** OceanBase Log Proxy container. */
+public class LogProxyContainer extends GenericContainer<LogProxyContainer> {
+
+    private static final String IMAGE = "oceanbase/oblogproxy-ce";
+
+    private static final int PORT = 2983;
+    private static final String ROOT_USER = "root";
+
+    private String sysPassword;
+
+    public LogProxyContainer(String version) {
+        super(DockerImageName.parse(IMAGE + ":" + version));
+        addExposedPorts(PORT);
+        setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1));
+    }
+
+    @Override
+    protected void configure() {
+        addEnv("OB_SYS_USERNAME", ROOT_USER);
+        addEnv("OB_SYS_PASSWORD", sysPassword);
+    }
+
+    public @NotNull Set<Integer> getLivenessCheckPortNumbers() {
+        return Collections.singleton(this.getMappedPort(PORT));
+    }
+
+    public int getPort() {
+        return getMappedPort(PORT);
+    }
+
+    public LogProxyContainer withSysPassword(String sysPassword) {
+        this.sysPassword = sysPassword;
+        return this;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java
new file mode 100644
index 000000000..bb2469509
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.io.Serializable;
+
+/** OceanBase CDC metadata. */
+public interface OceanBaseCdcMetadata extends Serializable {
+
+    String getCompatibleMode();
+
+    String getHostname();
+
+    int getPort();
+
+    String getUsername();
+
+    String getPassword();
+
+    String getDriverClass();
+
+    String getDatabase();
+
+    String getJdbcUrl();
+
+    String getTenantName();
+
+    String getLogProxyHost();
+
+    int getLogProxyPort();
+
+    default String getConfigUrl() {
+        return null;
+    }
+
+    default String getRsList() {
+        return null;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java
new file mode 100644
index 000000000..1cd498460
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import org.jetbrains.annotations.NotNull;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** OceanBase container. */
+public class OceanBaseContainer extends 
JdbcDatabaseContainer<OceanBaseContainer> {
+
+    private static final String IMAGE = "oceanbase/oceanbase-ce";
+
+    private static final int SQL_PORT = 2881;
+    private static final int RPC_PORT = 2882;
+    private static final String ROOT_USER = "root";
+    private static final String TEST_DATABASE = "test";
+    private static final String DEFAULT_TENANT = "test";
+    private static final String DEFAULT_PASSWORD = "";
+
+    private String mode = "mini";
+    private String tenantName = DEFAULT_TENANT;
+    private String sysPassword = DEFAULT_PASSWORD;
+    private String tenantPassword = DEFAULT_PASSWORD;
+
+    public OceanBaseContainer(String version) {
+        super(DockerImageName.parse(IMAGE + ":" + version));
+        addExposedPorts(SQL_PORT, RPC_PORT);
+        setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1));
+    }
+
+    @Override
+    protected void configure() {
+        addEnv("MODE", mode);
+        addEnv("OB_CLUSTER_NAME", "flink-cdc-ci");
+        if (!DEFAULT_PASSWORD.equals(sysPassword)) {
+            addEnv("OB_SYS_PASSWORD", sysPassword);
+        }
+        if (!DEFAULT_TENANT.equals(tenantName)) {
+            addEnv("OB_TENANT_NAME", tenantName);
+        }
+        if (!DEFAULT_PASSWORD.equals(tenantPassword)) {
+            addEnv("OB_TENANT_PASSWORD", tenantPassword);
+        }
+    }
+
+    protected void waitUntilContainerStarted() {
+        this.getWaitStrategy().waitUntilReady(this);
+    }
+
+    public @NotNull Set<Integer> getLivenessCheckPortNumbers() {
+        return Collections.singleton(this.getMappedPort(SQL_PORT));
+    }
+
+    @Override
+    public String getDriverClassName() {
+        return "com.mysql.cj.jdbc.Driver";
+    }
+
+    public String getJdbcUrl(String databaseName) {
+        return "jdbc:mysql://"
+                + getHost()
+                + ":"
+                + getDatabasePort()
+                + "/"
+                + databaseName
+                + "?useSSL=false";
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return getJdbcUrl("");
+    }
+
+    public int getDatabasePort() {
+        return getMappedPort(SQL_PORT);
+    }
+
+    public String getTenantName() {
+        return tenantName;
+    }
+
+    @Override
+    public String getDatabaseName() {
+        return TEST_DATABASE;
+    }
+
+    @Override
+    public String getUsername() {
+        return ROOT_USER + "@" + tenantName;
+    }
+
+    @Override
+    public String getPassword() {
+        return tenantPassword;
+    }
+
+    @Override
+    protected String getTestQueryString() {
+        return "SELECT 1";
+    }
+
+    public OceanBaseContainer withMode(String mode) {
+        this.mode = mode;
+        return this;
+    }
+
+    public OceanBaseContainer withTenantName(String tenantName) {
+        this.tenantName = tenantName;
+        return this;
+    }
+
+    public OceanBaseContainer withSysPassword(String sysPassword) {
+        this.sysPassword = sysPassword;
+        return this;
+    }
+
+    public OceanBaseContainer withTenantPassword(String tenantPassword) {
+        this.tenantPassword = tenantPassword;
+        return this;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
new file mode 100644
index 000000000..aede8a292
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** OceanBase CDC MySQL mode metadata. */
+public class OceanBaseMySQLCdcMetadata implements OceanBaseCdcMetadata {
+
+    private final OceanBaseContainer obServerContainer;
+    private final LogProxyContainer logProxyContainer;
+
+    private String rsList;
+
+    public OceanBaseMySQLCdcMetadata(
+            OceanBaseContainer obServerContainer, LogProxyContainer 
logProxyContainer) {
+        this.obServerContainer = obServerContainer;
+        this.logProxyContainer = logProxyContainer;
+    }
+
+    @Override
+    public String getCompatibleMode() {
+        return "mysql";
+    }
+
+    @Override
+    public String getHostname() {
+        return obServerContainer.getHost();
+    }
+
+    @Override
+    public int getPort() {
+        return obServerContainer.getDatabasePort();
+    }
+
+    @Override
+    public String getUsername() {
+        return obServerContainer.getUsername();
+    }
+
+    @Override
+    public String getPassword() {
+        return obServerContainer.getPassword();
+    }
+
+    @Override
+    public String getDriverClass() {
+        return obServerContainer.getDriverClassName();
+    }
+
+    @Override
+    public String getDatabase() {
+        return obServerContainer.getDatabaseName();
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return "jdbc:mysql://" + getHostname() + ":" + getPort() + 
"/?useSSL=false";
+    }
+
+    @Override
+    public String getTenantName() {
+        return obServerContainer.getTenantName();
+    }
+
+    @Override
+    public String getLogProxyHost() {
+        return logProxyContainer.getHost();
+    }
+
+    @Override
+    public int getLogProxyPort() {
+        return logProxyContainer.getPort();
+    }
+
+    @Override
+    public String getRsList() {
+        if (rsList == null) {
+            try (Connection connection =
+                            DriverManager.getConnection(
+                                    getJdbcUrl(), getUsername(), 
getPassword());
+                    Statement statement = connection.createStatement()) {
+                ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 
'rootservice_list'");
+                rsList = rs.next() ? rs.getString("VALUE") : null;
+            } catch (SQLException e) {
+                throw new RuntimeException("Failed to query rs list", e);
+            }
+            if (rsList == null) {
+                throw new RuntimeException("Got empty rs list");
+            }
+        }
+        return rsList;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
new file mode 100644
index 000000000..c68fdda03
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+/** OceanBase CDC Oracle mode metadata. */
+public class OceanBaseOracleCdcMetadata implements OceanBaseCdcMetadata {
+
+    @Override
+    public String getCompatibleMode() {
+        return "oracle";
+    }
+
+    @Override
+    public String getHostname() {
+        return System.getenv("host");
+    }
+
+    @Override
+    public int getPort() {
+        return Integer.parseInt(System.getenv("port"));
+    }
+
+    @Override
+    public String getUsername() {
+        return System.getenv("username");
+    }
+
+    @Override
+    public String getPassword() {
+        return System.getenv("password");
+    }
+
+    @Override
+    public String getDatabase() {
+        return System.getenv("schema");
+    }
+
+    @Override
+    public String getDriverClass() {
+        return "com.oceanbase.jdbc.Driver";
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return "jdbc:oceanbase://" + getHostname() + ":" + getPort() + "/" + 
getDatabase();
+    }
+
+    @Override
+    public String getTenantName() {
+        return System.getenv("tenant");
+    }
+
+    @Override
+    public String getLogProxyHost() {
+        return System.getenv("log_proxy_host");
+    }
+
+    @Override
+    public int getLogProxyPort() {
+        return Integer.parseInt(System.getenv("log_proxy_port"));
+    }
+
+    @Override
+    public String getConfigUrl() {
+        return System.getenv("config_url");
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
new file mode 100644
index 000000000..d189d030f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Create and populate a unique instance of an OceanBase database for each run 
of JUnit test. A user
+ * of class needs to provide a logical name for Debezium and database name. It 
is expected that
+ * there is an init file in 
<code>src/test/resources/ddl/&lt;database_name&gt;.sql</code>. The
+ * database name is enriched with a unique suffix that guarantees complete 
isolation between runs
+ * <code>
+ * &lt;database_name&gt_&lt;suffix&gt</code>
+ *
+ * <p>This class is inspired from Debezium project.
+ */
+public class UniqueDatabase {
+
+    private static final String[] CREATE_DATABASE_DDL =
+            new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
+    private static final String DROP_DATABASE_DDL = "DROP DATABASE IF EXISTS 
`$DBNAME$`;";
+    private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)--.*$");
+
+    private final OceanBaseContainer container;
+    private final String databaseName;
+    private final String templateName;
+
+    public UniqueDatabase(OceanBaseContainer container, String databaseName) {
+        this(container, databaseName, Integer.toUnsignedString(new 
Random().nextInt(), 36));
+    }
+
+    private UniqueDatabase(
+            OceanBaseContainer container, String databaseName, final String 
identifier) {
+        this.container = container;
+        this.databaseName = databaseName + "_" + identifier;
+        this.templateName = databaseName;
+    }
+
+    public String getHost() {
+        return container.getHost();
+    }
+
+    public int getDatabasePort() {
+        return container.getDatabasePort();
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getUsername() {
+        return container.getUsername();
+    }
+
+    public String getPassword() {
+        return container.getPassword();
+    }
+
+    /** @return Fully qualified table name 
<code>&lt;databaseName&gt;.&lt;tableName&gt;</code> */
+    public String qualifiedTableName(final String tableName) {
+        return String.format("%s.%s", databaseName, tableName);
+    }
+
+    /** Creates the database and populates it with initialization SQL script. 
*/
+    public void createAndInitialize() {
+        final String ddlFile = String.format("ddl/%s.sql", templateName);
+        final URL ddlTestFile = 
UniqueDatabase.class.getClassLoader().getResource(ddlFile);
+        assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+        try {
+            try (Connection connection =
+                            DriverManager.getConnection(
+                                    container.getJdbcUrl(), getUsername(), 
getPassword());
+                    Statement statement = connection.createStatement()) {
+                final List<String> statements =
+                        Arrays.stream(
+                                        Stream.concat(
+                                                        
Arrays.stream(CREATE_DATABASE_DDL),
+                                                        Files.readAllLines(
+                                                                
Paths.get(ddlTestFile.toURI()))
+                                                                .stream())
+                                                .map(String::trim)
+                                                .filter(x -> 
!x.startsWith("--") && !x.isEmpty())
+                                                .map(
+                                                        x -> {
+                                                            final Matcher m =
+                                                                    
COMMENT_PATTERN.matcher(x);
+                                                            return m.matches() 
? m.group(1) : x;
+                                                        })
+                                                .map(this::convertSQL)
+                                                
.collect(Collectors.joining("\n"))
+                                                .split(";"))
+                                .map(x -> x.replace("$$", ";"))
+                                .collect(Collectors.toList());
+                for (String stmt : statements) {
+                    statement.execute(stmt);
+                }
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /** Drop the database if it is existing. */
+    public void dropDatabase() {
+        try {
+            try (Connection connection =
+                            DriverManager.getConnection(
+                                    container.getJdbcUrl(), getUsername(), 
getPassword());
+                    Statement statement = connection.createStatement()) {
+                final String dropDatabaseStatement = 
convertSQL(DROP_DATABASE_DDL);
+                statement.execute(dropDatabaseStatement);
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                container.getJdbcUrl(databaseName), getUsername(), 
getPassword());
+    }
+
+    private String convertSQL(final String sql) {
+        return sql.replace("$DBNAME$", databaseName);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
deleted file mode 100644
index 0db9c71db..000000000
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
+++ /dev/null
@@ -1,17 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements.  See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License.  You may obtain a copy of the License at
---
---      http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- Set the root user password of test tenant
-ALTER USER root IDENTIFIED BY '123456';
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index d1c0bb7e7..72576bf17 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -102,7 +102,6 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
                 new GenericContainer<>(getFlinkDockerImageTag())
                         .withCommand("jobmanager")
                         .withNetwork(NETWORK)
-                        .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
                         .withExposedPorts(JOB_MANAGER_REST_PORT)
                         .withEnv("FLINK_PROPERTIES", flinkProperties)
@@ -111,7 +110,6 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
         taskManager =
                 new GenericContainer<>(getFlinkDockerImageTag())
                         .withCommand("taskmanager")
-                        .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetwork(NETWORK)
                         .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
                         .withEnv("FLINK_PROPERTIES", flinkProperties)
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index c1e50101a..cdb0b43e8 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -91,6 +91,13 @@ limitations under the License.
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-oceanbase-cdc</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-oracle-cdc</artifactId>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
index e257311a5..2cd2f88eb 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
@@ -19,99 +19,68 @@ package org.apache.flink.cdc.connectors.tests;
 
 import org.apache.flink.cdc.common.test.utils.JdbcProxy;
 import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
+import 
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
+import 
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.UniqueDatabase;
 import 
org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.MountableFile;
-
-import java.net.URL;
-import java.nio.file.Files;
+
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
-import static org.junit.Assert.assertNotNull;
+import static 
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
+import static 
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
 
 /** End-to-end tests for oceanbase-cdc connector uber jar. */
 public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseE2eITCase.class);
-
-    private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)--.*$");
+    private static final String INTER_CONTAINER_OB_SERVER_ALIAS = "oceanbase";
+    private static final String INTER_CONTAINER_LOG_PROXY_ALIAS = "oblogproxy";
 
     private static final Path obCdcJar = 
TestUtils.getResource("oceanbase-cdc-connector.jar");
     private static final Path mysqlDriverJar = 
TestUtils.getResource("mysql-driver.jar");
 
-    // 
------------------------------------------------------------------------------------------
-    // OceanBase container variables
-    // 
------------------------------------------------------------------------------------------
-    private static final String OB_SERVER_IMAGE = 
"oceanbase/oceanbase-ce:4.2.0.0";
-    private static final String OB_LOG_PROXY_IMAGE = 
"whhe/oblogproxy:1.1.3_4x";
-    private static final String NETWORK_MODE = "host";
-    private static final String INTER_CONTAINER_OB_HOST = 
"host.docker.internal";
-    private static final String SYS_PASSWORD = "1234567";
-    private static final String TEST_TENANT = "test";
-    private static final String TEST_USER = "root@" + TEST_TENANT;
-    private static final String TEST_PASSWORD = "7654321";
-
     @ClassRule
-    public static final GenericContainer<?> OB_SERVER =
-            new GenericContainer<>(OB_SERVER_IMAGE)
-                    .withNetworkMode(NETWORK_MODE)
-                    .withEnv("MODE", "slim")
-                    .withEnv("OB_DATAFILE_SIZE", "1G")
-                    .withEnv("OB_LOG_DISK_SIZE", "4G")
-                    .withEnv("OB_ROOT_PASSWORD", SYS_PASSWORD)
-                    .withEnv("OB_TENANT_NAME", TEST_TENANT)
-                    .withCopyFileToContainer(
-                            
MountableFile.forClasspathResource("docker/oceanbase/setup.sql"),
-                            "/root/boot/init.d/init.sql")
-                    .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
-                    .withStartupTimeout(Duration.ofMinutes(3))
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    public static final OceanBaseContainer OB_SERVER =
+            createOceanBaseContainerForCDC()
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_OB_SERVER_ALIAS);
 
     @ClassRule
-    public static final GenericContainer<?> LOG_PROXY =
-            new GenericContainer<>(OB_LOG_PROXY_IMAGE)
-                    .withNetworkMode(NETWORK_MODE)
-                    .withEnv("OB_SYS_PASSWORD", SYS_PASSWORD)
-                    .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
-                    .withStartupTimeout(Duration.ofMinutes(1))
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    public static final LogProxyContainer LOG_PROXY =
+            createLogProxyContainer()
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_LOG_PROXY_ALIAS);
+
+    private static final OceanBaseCdcMetadata METADATA =
+            new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);
+
+    protected final UniqueDatabase obInventoryDatabase =
+            new UniqueDatabase(OB_SERVER, "oceanbase_inventory");
 
     @Before
     public void before() {
         super.before();
 
-        initializeTable("oceanbase_inventory");
+        obInventoryDatabase.createAndInitialize();
     }
 
-    private Connection getTestConnection(String databaseName) {
-        try {
-            Class.forName(MYSQL_DRIVER_CLASS);
-            return DriverManager.getConnection(
-                    
String.format("jdbc:mysql://127.0.0.1:2881/%s?useSSL=false", databaseName),
-                    TEST_USER,
-                    TEST_PASSWORD);
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to get test jdbc connection", 
e);
-        }
+    @After
+    public void after() {
+        super.after();
+
+        obInventoryDatabase.dropDatabase();
     }
 
     @Test
@@ -131,16 +100,18 @@ public class OceanBaseE2eITCase extends 
FlinkContainerTestEnvironment {
                         ") WITH (",
                         " 'connector' = 'oceanbase-cdc',",
                         " 'scan.startup.mode' = 'initial',",
-                        " 'username' = '" + TEST_USER + "',",
-                        " 'password' = '" + TEST_PASSWORD + "',",
-                        " 'tenant-name' = '" + TEST_TENANT + "',",
-                        " 'table-list' = 'inventory.products_source',",
-                        " 'hostname' = '" + INTER_CONTAINER_OB_HOST + "',",
+                        " 'username' = '" + METADATA.getUsername() + "',",
+                        " 'password' = '" + METADATA.getPassword() + "',",
+                        " 'tenant-name' = '" + METADATA.getTenantName() + "',",
+                        " 'table-list' = '"
+                                + 
obInventoryDatabase.qualifiedTableName("products_source")
+                                + "',",
+                        " 'hostname' = '" + INTER_CONTAINER_OB_SERVER_ALIAS + 
"',",
                         " 'port' = '2881',",
-                        " 'jdbc.driver' = '" + MYSQL_DRIVER_CLASS + "',",
-                        " 'logproxy.host' = '" + INTER_CONTAINER_OB_HOST + 
"',",
+                        " 'jdbc.driver' = '" + METADATA.getDriverClass() + 
"',",
+                        " 'logproxy.host' = '" + 
INTER_CONTAINER_LOG_PROXY_ALIAS + "',",
                         " 'logproxy.port' = '2983',",
-                        " 'rootserver-list' = '127.0.0.1:2882:2881',",
+                        " 'rootserver-list' = '" + METADATA.getRsList() + "',",
                         " 'working-mode' = 'memory',",
                         " 'jdbc.properties.useSSL' = 'false'",
                         ");",
@@ -168,7 +139,7 @@ public class OceanBaseE2eITCase extends 
FlinkContainerTestEnvironment {
         submitSQLJob(sqlLines, obCdcJar, jdbcJar, mysqlDriverJar);
         waitUntilJobRunning(Duration.ofSeconds(30));
 
-        try (Connection conn = getTestConnection("inventory");
+        try (Connection conn = obInventoryDatabase.getJdbcConnection();
                 Statement stat = conn.createStatement()) {
             stat.execute(
                     "UPDATE products_source SET description='18oz carpenter 
hammer' WHERE id=106;");
@@ -211,32 +182,4 @@ public class OceanBaseE2eITCase extends 
FlinkContainerTestEnvironment {
                 new String[] {"id", "name", "description", "weight", "enum_c", 
"json_c"},
                 60000L);
     }
-
-    protected void initializeTable(String sqlFile) {
-        final String ddlFile = String.format("ddl/%s.sql", sqlFile);
-        final URL ddlTestFile = 
OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile);
-        assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
-        try (Connection connection = getTestConnection("");
-                Statement statement = connection.createStatement()) {
-            final List<String> statements =
-                    Arrays.stream(
-                                    
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
-                                            .map(String::trim)
-                                            .filter(x -> !x.startsWith("--") 
&& !x.isEmpty())
-                                            .map(
-                                                    x -> {
-                                                        final Matcher m =
-                                                                
COMMENT_PATTERN.matcher(x);
-                                                        return m.matches() ? 
m.group(1) : x;
-                                                    })
-                                            .collect(Collectors.joining("\n"))
-                                            .split(";"))
-                            .collect(Collectors.toList());
-            for (String stmt : statements) {
-                statement.execute(stmt);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
 }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index 91e7e5c6f..49ef039e0 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -135,7 +135,6 @@ public abstract class FlinkContainerTestEnvironment extends 
TestLogger {
                 new GenericContainer<>(getFlinkDockerImageTag())
                         .withCommand("jobmanager")
                         .withNetwork(NETWORK)
-                        .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
                         .withExposedPorts(JOB_MANAGER_REST_PORT)
                         .withEnv("FLINK_PROPERTIES", flinkProperties)
@@ -143,7 +142,6 @@ public abstract class FlinkContainerTestEnvironment extends 
TestLogger {
         taskManager =
                 new GenericContainer<>(getFlinkDockerImageTag())
                         .withCommand("taskmanager")
-                        .withExtraHost("host.docker.internal", "host-gateway")
                         .withNetwork(NETWORK)
                         .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
                         .withEnv("FLINK_PROPERTIES", flinkProperties)
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
index 6afe82d87..9c4ec5969 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
+++ 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
@@ -16,13 +16,8 @@
 -- 
----------------------------------------------------------------------------------------------------------------
 -- DATABASE:  inventory
 -- 
----------------------------------------------------------------------------------------------------------------
--- Create and populate our products using a single insert with many rows
-DROP DATABASE IF EXISTS inventory;
-
-CREATE DATABASE inventory;
-
-USE inventory;
 
+-- Create and populate our products using a single insert with many rows
 CREATE TABLE products_source (
     id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
     name VARCHAR(255) NOT NULL DEFAULT 'flink',
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
deleted file mode 100644
index ac15e8cb9..000000000
--- 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
+++ /dev/null
@@ -1,16 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements.  See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License.  You may obtain a copy of the License at
--- 
---      http://www.apache.org/licenses/LICENSE-2.0
--- 
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-ALTER USER root IDENTIFIED BY '7654321';

Reply via email to