This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new cbb33bb87 [FLINK-35638][OceanBase][test] Refactor OceanBase test cases
and remove dependency on host network (#3439)
cbb33bb87 is described below
commit cbb33bb8708d11b9b7b8d934aaa6009c4cffbfc2
Author: He Wang <[email protected]>
AuthorDate: Sun Aug 18 13:22:02 2024 +0800
[FLINK-35638][OceanBase][test] Refactor OceanBase test cases and remove
dependency on host network (#3439)
---
.../flink-sources/tutorials/oceanbase-tutorial.md | 55 ++++----
.../flink-sources/tutorials/oceanbase-tutorial.md | 46 +++---
.../flink-connector-oceanbase-cdc/pom.xml | 17 +++
.../connectors/oceanbase/OceanBaseTestBase.java | 72 +++-------
.../connectors/oceanbase/OceanBaseTestUtils.java | 67 +++++++++
.../oceanbase/table/OceanBaseMySQLModeITCase.java | 112 +++------------
.../oceanbase/table/OceanBaseOracleModeITCase.java | 56 ++------
.../oceanbase/testutils/LogProxyContainer.java | 62 ++++++++
.../oceanbase/testutils/OceanBaseCdcMetadata.java | 54 +++++++
.../oceanbase/testutils/OceanBaseContainer.java | 141 +++++++++++++++++++
.../testutils/OceanBaseMySQLCdcMetadata.java | 113 +++++++++++++++
.../testutils/OceanBaseOracleCdcMetadata.java | 82 +++++++++++
.../oceanbase/testutils/UniqueDatabase.java | 156 +++++++++++++++++++++
.../src/test/resources/ddl/mysql/docker_init.sql | 17 ---
.../tests/utils/PipelineTestEnvironment.java | 2 -
.../flink-cdc-source-e2e-tests/pom.xml | 7 +
.../cdc/connectors/tests/OceanBaseE2eITCase.java | 141 ++++++-------------
.../tests/utils/FlinkContainerTestEnvironment.java | 2 -
.../src/test/resources/ddl/oceanbase_inventory.sql | 7 +-
.../src/test/resources/docker/oceanbase/setup.sql | 16 ---
20 files changed, 841 insertions(+), 384 deletions(-)
diff --git
a/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
b/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
index 609e79700..62798a821 100644
---
a/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
+++
b/docs/content.zh/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
@@ -42,16 +42,23 @@ under the License.
version: '2.1'
services:
observer:
- image: oceanbase/oceanbase-ce:4.0.0.0
+ image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
container_name: observer
- network_mode: "host"
+ environment:
+ - 'MODE=mini'
+ - 'OB_SYS_PASSWORD=123456'
+ - 'OB_TENANT_PASSWORD=654321'
+ ports:
+ - '2881:2881'
+ - '2882:2882'
oblogproxy:
- image: whhe/oblogproxy:1.1.0_4x
+ image: 'oceanbase/oblogproxy-ce:latest'
container_name: oblogproxy
environment:
- 'OB_SYS_USERNAME=root'
- - 'OB_SYS_PASSWORD=pswd'
- network_mode: "host"
+ - 'OB_SYS_PASSWORD=123456'
+ ports:
+ - '2983:2983'
elasticsearch:
image: 'elastic/elasticsearch:7.6.0'
container_name: elasticsearch
@@ -85,42 +92,26 @@ services:
docker-compose up -d
```
-### 设置密码
-
-OceanBase 中 root 用户默认是没有密码的,但是 oblogproxy 需要配置一个使用非空密码的系统租户用户,因此这里我们需要先为
root@sys 用户设置一个密码。
+### 查询 Root Service List
登陆 sys 租户的 root 用户:
```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys
-```
-
-设置密码,注意这里的密码需要与上一步中 oblogproxy 服务的环境变量 'OB_SYS_PASSWORD' 保持一样。
-
-```mysql
-ALTER USER root IDENTIFIED BY 'pswd';
-```
-
-OceanBase 从社区版 4.0.0.0 开始只支持对非 sys 租户的增量数据拉取,这里我们使用 test 租户的 root 用户作为示例。
-
-登陆 test 租户的 root 用户:
-
-```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
```
-设置密码:
+执行以下 sql 以查询 root service list,将 VALUE 列的值保存下来。
```mysql
-ALTER USER root IDENTIFIED BY 'test';
+SHOW PARAMETERS LIKE 'rootservice_list';
```
### 准备数据
-使用 'root@test' 用户登陆。
+使用测试用的 test 租户的 root 用户登陆。
```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
```
```sql
@@ -169,6 +160,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102,
false),
### 在 Flink SQL CLI 中使用 Flink DDL 创建表
+注意在 OceanBase 源表的 SQL 中替换 root_service_list 为真实值。
+
```sql
-- 设置间隔时间为3秒
Flink SQL> SET execution.checkpointing.interval = 3s;
@@ -189,13 +182,13 @@ Flink SQL> CREATE TABLE orders (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
- 'password' = 'test',
+ 'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^orders$',
'hostname' = 'localhost',
'port' = '2881',
- 'rootserver-list' = '127.0.0.1:2882:2881',
+ 'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
@@ -211,13 +204,13 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
- 'password' = 'test',
+ 'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^products$',
'hostname' = 'localhost',
'port' = '2881',
- 'rootserver-list' = '127.0.0.1:2882:2881',
+ 'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
diff --git
a/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
b/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
index a9bdc3b98..744e8b79b 100644
--- a/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
+++ b/docs/content/docs/connectors/flink-sources/tutorials/oceanbase-tutorial.md
@@ -37,25 +37,27 @@ under the License.
Create `docker-compose.yml`.
-*Note*: `host` network mode is required in this demo, so it can only work on
Linux, see
[network-tutorial-host](https://docs.docker.com/network/network-tutorial-host/).
-
```yaml
version: '2.1'
services:
observer:
- image: oceanbase/oceanbase-ce:4.2.0.0
+ image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
container_name: observer
environment:
- - 'MODE=slim'
- - 'OB_ROOT_PASSWORD=pswd'
- network_mode: "host"
+ - 'MODE=mini'
+ - 'OB_SYS_PASSWORD=123456'
+ - 'OB_TENANT_PASSWORD=654321'
+ ports:
+ - '2881:2881'
+ - '2882:2882'
oblogproxy:
- image: whhe/oblogproxy:1.1.3_4x
+ image: 'oceanbase/oblogproxy-ce:latest'
container_name: oblogproxy
environment:
- 'OB_SYS_USERNAME=root'
- - 'OB_SYS_PASSWORD=pswd'
- network_mode: "host"
+ - 'OB_SYS_PASSWORD=123456'
+ ports:
+ - '2983:2983'
elasticsearch:
image: 'elastic/elasticsearch:7.6.0'
container_name: elasticsearch
@@ -89,22 +91,18 @@ Execute the following command in the directory where
`docker-compose.yml` is loc
docker-compose up -d
```
-### Set password
+### Query Root Service List
-From OceanBase 4.0.0.0 CE, we can only fetch the commit log of non-sys tenant.
-
-Here we use the 'test' tenant for example.
-
-Login with 'root' user of 'test' tenant:
+Login with 'root' user of 'sys' tenant:
```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
```
-Set a password:
+Query the root service list by following SQL and store the value.
```mysql
-ALTER USER root IDENTIFIED BY 'test';
+SHOW PARAMETERS LIKE 'rootservice_list';
```
### Create data for reading snapshot
@@ -112,7 +110,7 @@ ALTER USER root IDENTIFIED BY 'test';
Login 'root' user of 'test' tenant.
```shell
-docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
+docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
```
Insert data:
@@ -163,6 +161,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102,
false),
### Use Flink DDL to create dynamic table in Flink SQL CLI
+Note that in the SQL of the OceanBase source table, replace root_service_list
with the actual value.
+
```sql
-- checkpoint every 3000 milliseconds
Flink SQL> SET execution.checkpointing.interval = 3s;
@@ -183,13 +183,13 @@ Flink SQL> CREATE TABLE orders (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
- 'password' = 'test',
+ 'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^orders$',
'hostname' = 'localhost',
'port' = '2881',
- 'rootserver-list' = '127.0.0.1:2882:2881',
+ 'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
@@ -205,13 +205,13 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
- 'password' = 'test',
+ 'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^products$',
'hostname' = 'localhost',
'port' = '2881',
- 'rootserver-list' = '127.0.0.1:2882:2881',
+ 'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
index 1ed25b89f..726d7f438 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml
@@ -163,4 +163,21 @@ limitations under the License.
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test-jar</id>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
index c3a4c2ba5..413b7494f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java
@@ -17,20 +17,18 @@
package org.apache.flink.cdc.connectors.oceanbase;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.util.AbstractTestBase;
import org.junit.ClassRule;
-import org.junit.Rule;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
@@ -43,51 +41,13 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** Basic class for testing OceanBase source. */
-public abstract class OceanBaseTestBase extends TestLogger {
+public abstract class OceanBaseTestBase extends AbstractTestBase {
private static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)--.*$");
- protected static final int DEFAULT_PARALLELISM = 4;
-
- @Rule
- public final MiniClusterWithClientResource miniClusterResource =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
- .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .withHaLeadershipControl()
- .build());
-
@ClassRule public static LegacyRowResource usesLegacyRows =
LegacyRowResource.INSTANCE;
- protected final String compatibleMode;
- protected final String username;
- protected final String password;
- protected final String hostname;
- protected final int port;
- protected final String logProxyHost;
- protected final int logProxyPort;
- protected final String tenant;
-
- public OceanBaseTestBase(
- String compatibleMode,
- String username,
- String password,
- String hostname,
- int port,
- String logProxyHost,
- int logProxyPort,
- String tenant) {
- this.compatibleMode = compatibleMode;
- this.username = username;
- this.password = password;
- this.hostname = hostname;
- this.port = port;
- this.logProxyHost = logProxyHost;
- this.logProxyPort = logProxyPort;
- this.tenant = tenant;
- }
+ protected abstract OceanBaseCdcMetadata metadata();
protected String commonOptionsString() {
return String.format(
@@ -96,8 +56,14 @@ public abstract class OceanBaseTestBase extends TestLogger {
+ " 'password' = '%s', "
+ " 'hostname' = '%s', "
+ " 'port' = '%s', "
- + " 'compatible-mode' = '%s'",
- username, password, hostname, port, compatibleMode);
+ + " 'compatible-mode' = '%s', "
+ + " 'jdbc.driver' = '%s'",
+ metadata().getUsername(),
+ metadata().getPassword(),
+ metadata().getHostname(),
+ metadata().getPort(),
+ metadata().getCompatibleMode(),
+ metadata().getDriverClass());
}
protected String logProxyOptionsString() {
@@ -106,7 +72,9 @@ public abstract class OceanBaseTestBase extends TestLogger {
+ " 'tenant-name' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s'",
- tenant, logProxyHost, logProxyPort);
+ metadata().getTenantName(),
+ metadata().getLogProxyHost(),
+ metadata().getLogProxyPort());
}
protected String initialOptionsString() {
@@ -120,7 +88,10 @@ public abstract class OceanBaseTestBase extends TestLogger {
return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString();
}
- protected abstract Connection getJdbcConnection() throws SQLException;
+ protected Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ metadata().getJdbcUrl(), metadata().getUsername(),
metadata().getPassword());
+ }
protected void setGlobalTimeZone(String serverTimeZone) throws
SQLException {
try (Connection connection = getJdbcConnection();
@@ -130,7 +101,8 @@ public abstract class OceanBaseTestBase extends TestLogger {
}
protected void initializeTable(String sqlFile) {
- final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode,
sqlFile);
+ final String ddlFile =
+ String.format("ddl/%s/%s.sql", metadata().getCompatibleMode(),
sqlFile);
final URL ddlTestFile =
getClass().getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java
new file mode 100644
index 000000000..50885089e
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase;
+
+import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.time.Duration;
+
+/** Utils to help test. */
+@SuppressWarnings("resource")
+public class OceanBaseTestUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OceanBaseTestUtils.class);
+
+ private static final String LATEST_VERSION = "latest";
+ private static final String CDC_TEST_OB_VERSION =
"4.2.1.6-106000012024042515";
+
+ private static final String SYS_PASSWORD = "123456";
+ private static final String TEST_PASSWORD = "654321";
+
+ public static OceanBaseContainer createOceanBaseContainerForCDC() {
+ return createOceanBaseContainer(CDC_TEST_OB_VERSION, "mini")
+ .withSysPassword(SYS_PASSWORD)
+ .withStartupTimeout(Duration.ofMinutes(4));
+ }
+
+ public static OceanBaseContainer createOceanBaseContainerForJdbc() {
+ return createOceanBaseContainer(LATEST_VERSION, "slim")
+ .withStartupTimeout(Duration.ofMinutes(2));
+ }
+
+ public static OceanBaseContainer createOceanBaseContainer(String version,
String mode) {
+ return new OceanBaseContainer(version)
+ .withMode(mode)
+ .withTenantPassword(TEST_PASSWORD)
+ .withEnv("OB_DATAFILE_SIZE", "2G")
+ .withEnv("OB_LOG_DISK_SIZE", "4G")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+
+ public static LogProxyContainer createLogProxyContainer() {
+ return new LogProxyContainer(LATEST_VERSION)
+ .withSysPassword(SYS_PASSWORD)
+ .withStartupTimeout(Duration.ofMinutes(1))
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
index 4388b60af..616ccd5e2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
@@ -18,89 +18,55 @@
package org.apache.flink.cdc.connectors.oceanbase.table;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
+import
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
+import
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
+import org.testcontainers.containers.Network;
import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
import java.sql.Statement;
-import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.stream.Stream;
+
+import static
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
+import static
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
/** Integration tests for OceanBase MySQL mode table source. */
-@RunWith(Parameterized.class)
public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(OceanBaseMySQLModeITCase.class);
-
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
- private static final String NETWORK_MODE = "host";
- private static final String OB_SYS_PASSWORD = "123456";
+ @ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule
- public static final GenericContainer<?> OB_SERVER =
- new GenericContainer<>("oceanbase/oceanbase-ce:4.2.0.0")
- .withNetworkMode(NETWORK_MODE)
- .withEnv("MODE", "slim")
- .withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD)
- .withEnv("OB_DATAFILE_SIZE", "1G")
- .withEnv("OB_LOG_DISK_SIZE", "4G")
- .withCopyFileToContainer(
-
MountableFile.forClasspathResource("ddl/mysql/docker_init.sql"),
- "/root/boot/init.d/init.sql")
- .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
- .withStartupTimeout(Duration.ofMinutes(4))
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ public static final OceanBaseContainer OB_SERVER =
+ createOceanBaseContainerForCDC().withNetwork(NETWORK);
@ClassRule
- public static final GenericContainer<?> LOG_PROXY =
- new GenericContainer<>("whhe/oblogproxy:1.1.3_4x")
- .withNetworkMode(NETWORK_MODE)
- .withEnv("OB_SYS_PASSWORD", OB_SYS_PASSWORD)
- .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
- .withStartupTimeout(Duration.ofMinutes(1))
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- @BeforeClass
- public static void startContainers() {
- LOG.info("Starting containers...");
- Startables.deepStart(Stream.of(OB_SERVER, LOG_PROXY)).join();
- LOG.info("Containers are started.");
- }
+ public static final LogProxyContainer LOG_PROXY =
+ createLogProxyContainer().withNetwork(NETWORK);
+
+ private static final OceanBaseCdcMetadata METADATA =
+ new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);
- @AfterClass
- public static void stopContainers() {
- LOG.info("Stopping containers...");
- Stream.of(OB_SERVER, LOG_PROXY).forEach(GenericContainer::stop);
- LOG.info("Containers are stopped.");
+ @Override
+ protected OceanBaseCdcMetadata metadata() {
+ return METADATA;
}
@Before
@@ -110,47 +76,11 @@ public class OceanBaseMySQLModeITCase extends
OceanBaseTestBase {
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
}
- private final String rsList;
-
- public OceanBaseMySQLModeITCase(
- String username,
- String password,
- String hostname,
- int port,
- String logProxyHost,
- int logProxyPort,
- String tenant,
- String rsList) {
- super("mysql", username, password, hostname, port, logProxyHost,
logProxyPort, tenant);
- this.rsList = rsList;
- }
-
- @Parameterized.Parameters
- public static List<Object[]> parameters() {
- return Collections.singletonList(
- new Object[] {
- "root@test",
- "123456",
- "127.0.0.1",
- 2881,
- "127.0.0.1",
- 2983,
- "test",
- "127.0.0.1:2882:2881"
- });
- }
-
@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
+ " , "
- + String.format(" 'rootserver-list' = '%s'", rsList);
- }
-
- @Override
- protected Connection getJdbcConnection() throws SQLException {
- return DriverManager.getConnection(
- "jdbc:mysql://" + hostname + ":" + port + "/?useSSL=false",
username, password);
+ + String.format(" 'rootserver-list' = '%s'",
METADATA.getRsList());
}
@Test
@@ -312,6 +242,8 @@ public class OceanBaseMySQLModeITCase extends
OceanBaseTestBase {
waitForSinkSize("sink", snapshotSize + 1);
+ String tenant = metadata().getTenantName();
+
List<String> expected =
Arrays.asList(
"+I("
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
index b11da43ae..e7345baba 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
@@ -18,6 +18,8 @@
package org.apache.flink.cdc.connectors.oceanbase.table;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
+import
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
+import
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseOracleCdcMetadata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
@@ -26,20 +28,14 @@ import
org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.junit.Ignore;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
/** Integration tests for OceanBase Oracle mode table source. */
@Ignore("Test ignored before oceanbase-xe docker image is available")
-@RunWith(Parameterized.class)
public class OceanBaseOracleModeITCase extends OceanBaseTestBase {
private final StreamExecutionEnvironment env =
@@ -48,61 +44,25 @@ public class OceanBaseOracleModeITCase extends
OceanBaseTestBase {
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
- private final String schema;
- private final String configUrl;
-
- public OceanBaseOracleModeITCase(
- String username,
- String password,
- String hostname,
- int port,
- String logProxyHost,
- int logProxyPort,
- String tenant,
- String schema,
- String configUrl) {
- super("oracle", username, password, hostname, port, logProxyHost,
logProxyPort, tenant);
- this.schema = schema;
- this.configUrl = configUrl;
- }
-
- @Parameterized.Parameters
- public static List<Object[]> parameters() {
- return Collections.singletonList(
- new Object[] {
- "SYS@test",
- "123456",
- "127.0.0.1",
- 2881,
- "127.0.0.1",
- 2983,
- "test",
- "SYS",
-
"http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster"
- });
- }
+ private static final OceanBaseCdcMetadata METADATA = new
OceanBaseOracleCdcMetadata();
@Override
- protected String commonOptionsString() {
- return super.commonOptionsString() + " , " + " 'jdbc.driver' =
'com.oceanbase.jdbc.Driver'";
+ protected OceanBaseCdcMetadata metadata() {
+ return METADATA;
}
@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
+ " , "
- + String.format(" 'config-url' = '%s'", configUrl);
- }
-
- @Override
- protected Connection getJdbcConnection() throws SQLException {
- return DriverManager.getConnection(
- "jdbc:oceanbase://" + hostname + ":" + port + "/" + schema,
username, password);
+ + String.format(" 'config-url' = '%s'",
METADATA.getConfigUrl());
}
@Test
public void testAllDataTypes() throws Exception {
initializeTable("column_type_test");
+
+ String schema = metadata().getDatabase();
String sourceDDL =
String.format(
"CREATE TABLE full_types ("
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java
new file mode 100644
index 000000000..c33eccbb4
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/LogProxyContainer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import org.jetbrains.annotations.NotNull;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** OceanBase Log Proxy container. */
+public class LogProxyContainer extends GenericContainer<LogProxyContainer> {
+
+ private static final String IMAGE = "oceanbase/oblogproxy-ce";
+
+ private static final int PORT = 2983;
+ private static final String ROOT_USER = "root";
+
+ private String sysPassword;
+
+ public LogProxyContainer(String version) {
+ super(DockerImageName.parse(IMAGE + ":" + version));
+ addExposedPorts(PORT);
+ setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1));
+ }
+
+ @Override
+ protected void configure() {
+ addEnv("OB_SYS_USERNAME", ROOT_USER);
+ addEnv("OB_SYS_PASSWORD", sysPassword);
+ }
+
+ public @NotNull Set<Integer> getLivenessCheckPortNumbers() {
+ return Collections.singleton(this.getMappedPort(PORT));
+ }
+
+ public int getPort() {
+ return getMappedPort(PORT);
+ }
+
+ public LogProxyContainer withSysPassword(String sysPassword) {
+ this.sysPassword = sysPassword;
+ return this;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java
new file mode 100644
index 000000000..bb2469509
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseCdcMetadata.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.io.Serializable;
+
+/** OceanBase CDC metadata. */
+public interface OceanBaseCdcMetadata extends Serializable {
+
+ String getCompatibleMode();
+
+ String getHostname();
+
+ int getPort();
+
+ String getUsername();
+
+ String getPassword();
+
+ String getDriverClass();
+
+ String getDatabase();
+
+ String getJdbcUrl();
+
+ String getTenantName();
+
+ String getLogProxyHost();
+
+ int getLogProxyPort();
+
+ default String getConfigUrl() {
+ return null;
+ }
+
+ default String getRsList() {
+ return null;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java
new file mode 100644
index 000000000..1cd498460
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseContainer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import org.jetbrains.annotations.NotNull;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** OceanBase container. */
+public class OceanBaseContainer extends
JdbcDatabaseContainer<OceanBaseContainer> {
+
+ private static final String IMAGE = "oceanbase/oceanbase-ce";
+
+ private static final int SQL_PORT = 2881;
+ private static final int RPC_PORT = 2882;
+ private static final String ROOT_USER = "root";
+ private static final String TEST_DATABASE = "test";
+ private static final String DEFAULT_TENANT = "test";
+ private static final String DEFAULT_PASSWORD = "";
+
+ private String mode = "mini";
+ private String tenantName = DEFAULT_TENANT;
+ private String sysPassword = DEFAULT_PASSWORD;
+ private String tenantPassword = DEFAULT_PASSWORD;
+
+ public OceanBaseContainer(String version) {
+ super(DockerImageName.parse(IMAGE + ":" + version));
+ addExposedPorts(SQL_PORT, RPC_PORT);
+ setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1));
+ }
+
+ @Override
+ protected void configure() {
+ addEnv("MODE", mode);
+ addEnv("OB_CLUSTER_NAME", "flink-cdc-ci");
+ if (!DEFAULT_PASSWORD.equals(sysPassword)) {
+ addEnv("OB_SYS_PASSWORD", sysPassword);
+ }
+ if (!DEFAULT_TENANT.equals(tenantName)) {
+ addEnv("OB_TENANT_NAME", tenantName);
+ }
+ if (!DEFAULT_PASSWORD.equals(tenantPassword)) {
+ addEnv("OB_TENANT_PASSWORD", tenantPassword);
+ }
+ }
+
+ protected void waitUntilContainerStarted() {
+ this.getWaitStrategy().waitUntilReady(this);
+ }
+
+ public @NotNull Set<Integer> getLivenessCheckPortNumbers() {
+ return Collections.singleton(this.getMappedPort(SQL_PORT));
+ }
+
+ @Override
+ public String getDriverClassName() {
+ return "com.mysql.cj.jdbc.Driver";
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getDatabasePort()
+ + "/"
+ + databaseName
+ + "?useSSL=false";
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl("");
+ }
+
+ public int getDatabasePort() {
+ return getMappedPort(SQL_PORT);
+ }
+
+ public String getTenantName() {
+ return tenantName;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return TEST_DATABASE;
+ }
+
+ @Override
+ public String getUsername() {
+ return ROOT_USER + "@" + tenantName;
+ }
+
+ @Override
+ public String getPassword() {
+ return tenantPassword;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ public OceanBaseContainer withMode(String mode) {
+ this.mode = mode;
+ return this;
+ }
+
+ public OceanBaseContainer withTenantName(String tenantName) {
+ this.tenantName = tenantName;
+ return this;
+ }
+
+ public OceanBaseContainer withSysPassword(String sysPassword) {
+ this.sysPassword = sysPassword;
+ return this;
+ }
+
+ public OceanBaseContainer withTenantPassword(String tenantPassword) {
+ this.tenantPassword = tenantPassword;
+ return this;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
new file mode 100644
index 000000000..aede8a292
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** OceanBase CDC MySQL mode metadata. */
+public class OceanBaseMySQLCdcMetadata implements OceanBaseCdcMetadata {
+
+ private final OceanBaseContainer obServerContainer;
+ private final LogProxyContainer logProxyContainer;
+
+ private String rsList;
+
+ public OceanBaseMySQLCdcMetadata(
+ OceanBaseContainer obServerContainer, LogProxyContainer
logProxyContainer) {
+ this.obServerContainer = obServerContainer;
+ this.logProxyContainer = logProxyContainer;
+ }
+
+ @Override
+ public String getCompatibleMode() {
+ return "mysql";
+ }
+
+ @Override
+ public String getHostname() {
+ return obServerContainer.getHost();
+ }
+
+ @Override
+ public int getPort() {
+ return obServerContainer.getDatabasePort();
+ }
+
+ @Override
+ public String getUsername() {
+ return obServerContainer.getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return obServerContainer.getPassword();
+ }
+
+ @Override
+ public String getDriverClass() {
+ return obServerContainer.getDriverClassName();
+ }
+
+ @Override
+ public String getDatabase() {
+ return obServerContainer.getDatabaseName();
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return "jdbc:mysql://" + getHostname() + ":" + getPort() +
"/?useSSL=false";
+ }
+
+ @Override
+ public String getTenantName() {
+ return obServerContainer.getTenantName();
+ }
+
+ @Override
+ public String getLogProxyHost() {
+ return logProxyContainer.getHost();
+ }
+
+ @Override
+ public int getLogProxyPort() {
+ return logProxyContainer.getPort();
+ }
+
+ @Override
+ public String getRsList() {
+ if (rsList == null) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ getJdbcUrl(), getUsername(),
getPassword());
+ Statement statement = connection.createStatement()) {
+ ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE
'rootservice_list'");
+ rsList = rs.next() ? rs.getString("VALUE") : null;
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to query rs list", e);
+ }
+ if (rsList == null) {
+ throw new RuntimeException("Got empty rs list");
+ }
+ }
+ return rsList;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
new file mode 100644
index 000000000..c68fdda03
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+/** OceanBase CDC Oracle mode metadata. */
+public class OceanBaseOracleCdcMetadata implements OceanBaseCdcMetadata {
+
+ @Override
+ public String getCompatibleMode() {
+ return "oracle";
+ }
+
+ @Override
+ public String getHostname() {
+ return System.getenv("host");
+ }
+
+ @Override
+ public int getPort() {
+ return Integer.parseInt(System.getenv("port"));
+ }
+
+ @Override
+ public String getUsername() {
+ return System.getenv("username");
+ }
+
+ @Override
+ public String getPassword() {
+ return System.getenv("password");
+ }
+
+ @Override
+ public String getDatabase() {
+ return System.getenv("schema");
+ }
+
+ @Override
+ public String getDriverClass() {
+ return "com.oceanbase.jdbc.Driver";
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return "jdbc:oceanbase://" + getHostname() + ":" + getPort() + "/" +
getDatabase();
+ }
+
+ @Override
+ public String getTenantName() {
+ return System.getenv("tenant");
+ }
+
+ @Override
+ public String getLogProxyHost() {
+ return System.getenv("log_proxy_host");
+ }
+
+ @Override
+ public int getLogProxyPort() {
+ return Integer.parseInt(System.getenv("log_proxy_port"));
+ }
+
+ @Override
+ public String getConfigUrl() {
+ return System.getenv("config_url");
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
new file mode 100644
index 000000000..d189d030f
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Create and populate a unique instance of an OceanBase database for each run
of JUnit test. A user
+ * of class needs to provide a logical name for Debezium and database name. It
is expected that
+ * there is an init file in
<code>src/test/resources/ddl/<database_name>.sql</code>. The
+ * database name is enriched with a unique suffix that guarantees complete
isolation between runs
+ * <code>
+ * <database_name>_<suffix></code>
+ *
+ * <p>This class is inspired from Debezium project.
+ */
+public class UniqueDatabase {
+
+ private static final String[] CREATE_DATABASE_DDL =
+ new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
+ private static final String DROP_DATABASE_DDL = "DROP DATABASE IF EXISTS
`$DBNAME$`;";
+ private static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)--.*$");
+
+ private final OceanBaseContainer container;
+ private final String databaseName;
+ private final String templateName;
+
+ public UniqueDatabase(OceanBaseContainer container, String databaseName) {
+ this(container, databaseName, Integer.toUnsignedString(new
Random().nextInt(), 36));
+ }
+
+ private UniqueDatabase(
+ OceanBaseContainer container, String databaseName, final String
identifier) {
+ this.container = container;
+ this.databaseName = databaseName + "_" + identifier;
+ this.templateName = databaseName;
+ }
+
+ public String getHost() {
+ return container.getHost();
+ }
+
+ public int getDatabasePort() {
+ return container.getDatabasePort();
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getUsername() {
+ return container.getUsername();
+ }
+
+ public String getPassword() {
+ return container.getPassword();
+ }
+
+ /** @return Fully qualified table name
<code><databaseName>.<tableName></code> */
+ public String qualifiedTableName(final String tableName) {
+ return String.format("%s.%s", databaseName, tableName);
+ }
+
+ /** Creates the database and populates it with initialization SQL script.
*/
+ public void createAndInitialize() {
+ final String ddlFile = String.format("ddl/%s.sql", templateName);
+ final URL ddlTestFile =
UniqueDatabase.class.getClassLoader().getResource(ddlFile);
+ assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+ try {
+ try (Connection connection =
+ DriverManager.getConnection(
+ container.getJdbcUrl(), getUsername(),
getPassword());
+ Statement statement = connection.createStatement()) {
+ final List<String> statements =
+ Arrays.stream(
+ Stream.concat(
+
Arrays.stream(CREATE_DATABASE_DDL),
+ Files.readAllLines(
+
Paths.get(ddlTestFile.toURI()))
+ .stream())
+ .map(String::trim)
+ .filter(x ->
!x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+
COMMENT_PATTERN.matcher(x);
+ return m.matches()
? m.group(1) : x;
+ })
+ .map(this::convertSQL)
+
.collect(Collectors.joining("\n"))
+ .split(";"))
+ .map(x -> x.replace("$$", ";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /** Drop the database if it is existing. */
+ public void dropDatabase() {
+ try {
+ try (Connection connection =
+ DriverManager.getConnection(
+ container.getJdbcUrl(), getUsername(),
getPassword());
+ Statement statement = connection.createStatement()) {
+ final String dropDatabaseStatement =
convertSQL(DROP_DATABASE_DDL);
+ statement.execute(dropDatabaseStatement);
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ container.getJdbcUrl(databaseName), getUsername(),
getPassword());
+ }
+
+ private String convertSQL(final String sql) {
+ return sql.replace("$DBNAME$", databaseName);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
deleted file mode 100644
index 0db9c71db..000000000
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
+++ /dev/null
@@ -1,17 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements. See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- Set the root user password of test tenant
-ALTER USER root IDENTIFIED BY '123456';
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index d1c0bb7e7..72576bf17 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -102,7 +102,6 @@ public abstract class PipelineTestEnvironment extends
TestLogger {
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
@@ -111,7 +110,6 @@ public abstract class PipelineTestEnvironment extends
TestLogger {
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index c1e50101a..cdb0b43e8 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -91,6 +91,13 @@ limitations under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-oceanbase-cdc</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
diff --git
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
index e257311a5..2cd2f88eb 100644
---
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
@@ -19,99 +19,68 @@ package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
+import
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
+import
org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.UniqueDatabase;
import
org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.MountableFile;
-
-import java.net.URL;
-import java.nio.file.Files;
+
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import static org.junit.Assert.assertNotNull;
+import static
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
+import static
org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
/** End-to-end tests for oceanbase-cdc connector uber jar. */
public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
- private static final Logger LOG =
LoggerFactory.getLogger(OceanBaseE2eITCase.class);
-
- private static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)--.*$");
+ private static final String INTER_CONTAINER_OB_SERVER_ALIAS = "oceanbase";
+ private static final String INTER_CONTAINER_LOG_PROXY_ALIAS = "oblogproxy";
private static final Path obCdcJar =
TestUtils.getResource("oceanbase-cdc-connector.jar");
private static final Path mysqlDriverJar =
TestUtils.getResource("mysql-driver.jar");
- //
------------------------------------------------------------------------------------------
- // OceanBase container variables
- //
------------------------------------------------------------------------------------------
- private static final String OB_SERVER_IMAGE =
"oceanbase/oceanbase-ce:4.2.0.0";
- private static final String OB_LOG_PROXY_IMAGE =
"whhe/oblogproxy:1.1.3_4x";
- private static final String NETWORK_MODE = "host";
- private static final String INTER_CONTAINER_OB_HOST =
"host.docker.internal";
- private static final String SYS_PASSWORD = "1234567";
- private static final String TEST_TENANT = "test";
- private static final String TEST_USER = "root@" + TEST_TENANT;
- private static final String TEST_PASSWORD = "7654321";
-
@ClassRule
- public static final GenericContainer<?> OB_SERVER =
- new GenericContainer<>(OB_SERVER_IMAGE)
- .withNetworkMode(NETWORK_MODE)
- .withEnv("MODE", "slim")
- .withEnv("OB_DATAFILE_SIZE", "1G")
- .withEnv("OB_LOG_DISK_SIZE", "4G")
- .withEnv("OB_ROOT_PASSWORD", SYS_PASSWORD)
- .withEnv("OB_TENANT_NAME", TEST_TENANT)
- .withCopyFileToContainer(
-
MountableFile.forClasspathResource("docker/oceanbase/setup.sql"),
- "/root/boot/init.d/init.sql")
- .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
- .withStartupTimeout(Duration.ofMinutes(3))
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ public static final OceanBaseContainer OB_SERVER =
+ createOceanBaseContainerForCDC()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_OB_SERVER_ALIAS);
@ClassRule
- public static final GenericContainer<?> LOG_PROXY =
- new GenericContainer<>(OB_LOG_PROXY_IMAGE)
- .withNetworkMode(NETWORK_MODE)
- .withEnv("OB_SYS_PASSWORD", SYS_PASSWORD)
- .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
- .withStartupTimeout(Duration.ofMinutes(1))
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ public static final LogProxyContainer LOG_PROXY =
+ createLogProxyContainer()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_LOG_PROXY_ALIAS);
+
+ private static final OceanBaseCdcMetadata METADATA =
+ new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);
+
+ protected final UniqueDatabase obInventoryDatabase =
+ new UniqueDatabase(OB_SERVER, "oceanbase_inventory");
@Before
public void before() {
super.before();
- initializeTable("oceanbase_inventory");
+ obInventoryDatabase.createAndInitialize();
}
- private Connection getTestConnection(String databaseName) {
- try {
- Class.forName(MYSQL_DRIVER_CLASS);
- return DriverManager.getConnection(
-
String.format("jdbc:mysql://127.0.0.1:2881/%s?useSSL=false", databaseName),
- TEST_USER,
- TEST_PASSWORD);
- } catch (Exception e) {
- throw new RuntimeException("Failed to get test jdbc connection",
e);
- }
+ @After
+ public void after() {
+ super.after();
+
+ obInventoryDatabase.dropDatabase();
}
@Test
@@ -131,16 +100,18 @@ public class OceanBaseE2eITCase extends
FlinkContainerTestEnvironment {
") WITH (",
" 'connector' = 'oceanbase-cdc',",
" 'scan.startup.mode' = 'initial',",
- " 'username' = '" + TEST_USER + "',",
- " 'password' = '" + TEST_PASSWORD + "',",
- " 'tenant-name' = '" + TEST_TENANT + "',",
- " 'table-list' = 'inventory.products_source',",
- " 'hostname' = '" + INTER_CONTAINER_OB_HOST + "',",
+ " 'username' = '" + METADATA.getUsername() + "',",
+ " 'password' = '" + METADATA.getPassword() + "',",
+ " 'tenant-name' = '" + METADATA.getTenantName() + "',",
+ " 'table-list' = '"
+ +
obInventoryDatabase.qualifiedTableName("products_source")
+ + "',",
+ " 'hostname' = '" + INTER_CONTAINER_OB_SERVER_ALIAS +
"',",
" 'port' = '2881',",
- " 'jdbc.driver' = '" + MYSQL_DRIVER_CLASS + "',",
- " 'logproxy.host' = '" + INTER_CONTAINER_OB_HOST +
"',",
+ " 'jdbc.driver' = '" + METADATA.getDriverClass() +
"',",
+ " 'logproxy.host' = '" +
INTER_CONTAINER_LOG_PROXY_ALIAS + "',",
" 'logproxy.port' = '2983',",
- " 'rootserver-list' = '127.0.0.1:2882:2881',",
+ " 'rootserver-list' = '" + METADATA.getRsList() + "',",
" 'working-mode' = 'memory',",
" 'jdbc.properties.useSSL' = 'false'",
");",
@@ -168,7 +139,7 @@ public class OceanBaseE2eITCase extends
FlinkContainerTestEnvironment {
submitSQLJob(sqlLines, obCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
- try (Connection conn = getTestConnection("inventory");
+ try (Connection conn = obInventoryDatabase.getJdbcConnection();
Statement stat = conn.createStatement()) {
stat.execute(
"UPDATE products_source SET description='18oz carpenter
hammer' WHERE id=106;");
@@ -211,32 +182,4 @@ public class OceanBaseE2eITCase extends
FlinkContainerTestEnvironment {
new String[] {"id", "name", "description", "weight", "enum_c",
"json_c"},
60000L);
}
-
- protected void initializeTable(String sqlFile) {
- final String ddlFile = String.format("ddl/%s.sql", sqlFile);
- final URL ddlTestFile =
OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile);
- assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
- try (Connection connection = getTestConnection("");
- Statement statement = connection.createStatement()) {
- final List<String> statements =
- Arrays.stream(
-
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
- .map(String::trim)
- .filter(x -> !x.startsWith("--")
&& !x.isEmpty())
- .map(
- x -> {
- final Matcher m =
-
COMMENT_PATTERN.matcher(x);
- return m.matches() ?
m.group(1) : x;
- })
- .collect(Collectors.joining("\n"))
- .split(";"))
- .collect(Collectors.toList());
- for (String stmt : statements) {
- statement.execute(stmt);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index 91e7e5c6f..49ef039e0 100644
---
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -135,7 +135,6 @@ public abstract class FlinkContainerTestEnvironment extends
TestLogger {
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
@@ -143,7 +142,6 @@ public abstract class FlinkContainerTestEnvironment extends
TestLogger {
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)
diff --git
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
index 6afe82d87..9c4ec5969 100644
---
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
+++
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
@@ -16,13 +16,8 @@
--
----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
--
----------------------------------------------------------------------------------------------------------------
--- Create and populate our products using a single insert with many rows
-DROP DATABASE IF EXISTS inventory;
-
-CREATE DATABASE inventory;
-
-USE inventory;
+-- Create and populate our products using a single insert with many rows
CREATE TABLE products_source (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
diff --git
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
deleted file mode 100644
index ac15e8cb9..000000000
---
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
+++ /dev/null
@@ -1,16 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements. See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-ALTER USER root IDENTIFIED BY '7654321';