This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new e198a25c [FLINK-34572] Support OceanBase Jdbc Catalog (#109)
e198a25c is described below

commit e198a25c23548431558efdb973cff058da56122a
Author: He Wang <[email protected]>
AuthorDate: Fri Aug 2 09:39:57 2024 +0800

    [FLINK-34572] Support OceanBase Jdbc Catalog (#109)
---
 docs/content.zh/docs/connectors/table/jdbc.md      |  45 ++-
 docs/content/docs/connectors/table/jdbc.md         |  40 ++-
 .../core/database/catalog/AbstractJdbcCatalog.java |  30 +-
 .../cratedb/database/catalog/CrateDBCatalog.java   |   3 +-
 .../jdbc/mysql/database/catalog/MySqlCatalog.java  |   2 +-
 .../jdbc/oceanbase/database/OceanBaseFactory.java  |  33 +-
 .../database/catalog/OceanBaseCatalog.java         | 125 +++----
 .../database/catalog/OceanBaseTypeMapper.java      | 154 +++++++++
 .../database/dialect/OceanBaseCompatibleMode.java  |  48 +++
 .../database/dialect/OceanBaseDialect.java         |  18 +-
 .../dialect/OceanBaseDialectConverter.java         |   5 +-
 .../jdbc/oceanbase/OceanBaseMysqlTestBase.java     |   5 +-
 .../jdbc/oceanbase/OceanBaseOracleTestBase.java    |  17 +-
 .../catalog/OceanBaseCatalogITCaseBase.java        | 261 ++++++++++++++
 .../catalog/OceanBaseMysqlCatalogITCase.java       | 376 +++++++++++++++++++++
 .../catalog/OceanBaseOracleCatalogITCase.java      | 121 +++++++
 .../database/dialect/OceanBaseDialectTest.java     |   4 +-
 .../dialect/OceanBaseOracleDialectTest.java        |  16 +
 .../OceanBaseMySqlDynamicTableSinkITCase.java      |   1 +
 .../OceanBaseMySqlDynamicTableSourceITCase.java    |   1 +
 .../OceanBaseOracleDynamicTableSinkITCase.java     |   3 +-
 .../OceanBaseOracleDynamicTableSourceITCase.java   |   3 +-
 .../jdbc/oceanbase/table/OceanBaseTableRow.java    |   6 +-
 .../oceanbase/testutils/OceanBaseContainer.java    |  75 ++++
 .../oceanbase/testutils/OceanBaseDatabase.java     |  42 +--
 .../jdbc/oceanbase/testutils/OceanBaseImages.java  |   2 +-
 .../postgres/database/catalog/PostgresCatalog.java |   2 +-
 27 files changed, 1291 insertions(+), 147 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/jdbc.md 
b/docs/content.zh/docs/connectors/table/jdbc.md
index f8f517bf..3c04dbea 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -432,10 +432,9 @@ JDBC Catalog
 
 `JdbcCatalog` 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。
 
-目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 
方法。其他方法目前尚不支持。
+目前,JDBC Catalog 有以下实现:Postgres Catalog、MySQL Catalog、CrateDB Catalog 和 
OceanBase Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。
 
 ```java
-// Postgres Catalog & MySQL Catalog 支持的方法
 databaseExists(String databaseName);
 listDatabases();
 getDatabase(String databaseName);
@@ -450,17 +449,19 @@ tableExists(ObjectPath tablePath);
 
 ### JDBC Catalog 的使用
 
-本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。
+本小节主要描述如何创建并使用 JDBC Catalog。
 请参阅 [依赖](#依赖) 部分了解如何配置 JDBC 连接器和相应的驱动。
 
 JDBC catalog 支持以下参数:
 - `name`:必填,catalog 的名称。
 - `default-database`:必填,默认要连接的数据库。
-- `username`:必填,Postgres/MySQL 账户的用户名。
+- `username`:必填,数据库账户的用户名。
 - `password`:必填,账户的密码。
 - `base-url`:必填,(不应该包含数据库名)
   - 对于 Postgres Catalog `base-url` 应为 `"jdbc:postgresql://<ip>:<port>"` 的格式。
   - 对于 MySQL Catalog `base-url` 应为 `"jdbc:mysql://<ip>:<port>"` 的格式。
+  - 对于 OceanBase Catalog `base-url` 应为 `"jdbc:oceanbase://<ip>:<port>"` 的格式。
+- `compatible-mode`: 选填,数据库的兼容模式。
 
 {{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}}
 {{< tab "SQL" >}}
@@ -656,6 +657,42 @@ SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
 SELECT * FROM crate.`custom_schema.test_table2`;
 SELECT * FROM `custom_schema.test_table2`;
 ```
+
+<a name="jdbc-catalog-for-oceanbase"></a>
+
+### JDBC Catalog for OceanBase
+
+<a name="oceanbase-metaspace-mapping"></a>
+
+#### OceanBase 元空间映射
+
+OceanBase 数据库支持多租户管理,每个租户可以工作在 MySQL 兼容模式或 Oracle 兼容模式。在 OceanBase 的 MySQL 
模式上,一个租户中有数据库和表,就像 MySQL 数据库中的数据库和表一样,但没有 schema。在 OceanBase 的 Oracle 
模式下,一个租户中有 schema 和表,就像 Oracle 数据库中的 schema 和表一样,但没有数据库。
+
+在 Flink 中,查询 OceanBase Catalog 注册的表时,OceanBase MySQL 模式下可以使用 
`database.table_name` 或只使用 `table_name`,OceanBase Oracle 模式下可以使用 
`schema.table_name` 或只使用 `table_name`。
+
+因此,Flink Catalog 和 OceanBase catalog 之间的元空间映射如下:
+
+| Flink Catalog Metaspace Structure    | OceanBase Metaspace Structure (MySQL 
Mode) | OceanBase Metaspace Structure (Oracle Mode) |
+|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
+| catalog name (defined in Flink only) | N/A                                   
     | N/A                                         |
+| database name                        | database name                         
     | schema name                                 |
+| table name                           | table name                            
     | table name                                  |
+
+Flink 中的 OceanBase 表的完整路径应该是 ``"`<catalog>`.`<db or schema>`.`<table>`"``。
+
+这里提供了一些访问 OceanBase 表的例子:
+
+```sql
+-- 扫描 默认数据库 'mydb' 中的 'test_table' 表
+SELECT * FROM oceanbase_catalog.mydb.test_table;
+SELECT * FROM mydb.test_table;
+SELECT * FROM test_table;
+
+-- 扫描 'given_database' 数据库中的 'test_table2' 表,
+SELECT * FROM oceanbase_catalog.given_database.test_table2;
+SELECT * FROM given_database.test_table2;
+```
+
 <a name="data-type-mapping"></a>
 
 数据类型映射
diff --git a/docs/content/docs/connectors/table/jdbc.md 
b/docs/content/docs/connectors/table/jdbc.md
index 056f2ab5..258e996c 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -441,10 +441,9 @@ JDBC Catalog
 
 The `JdbcCatalog` enables users to connect Flink to relational databases over 
JDBC protocol.
 
-Currently, there are two JDBC catalog implementations, Postgres Catalog and 
MySQL Catalog. They support the following catalog methods. Other methods are 
currently not supported.
+Currently, there are following JDBC catalog implementations: Postgres Catalog, 
MySQL Catalog, CrateDB Catalog and OceanBase Catalog. They support the 
following catalog methods. Other methods are currently not supported.
 
 ```java
-// The supported methods by Postgres & MySQL Catalog.
 databaseExists(String databaseName);
 listDatabases();
 getDatabase(String databaseName);
@@ -457,17 +456,19 @@ Other `Catalog` methods are currently not supported.
 
 ### Usage of JDBC Catalog
 
-The section mainly describes how to create and use a Postgres Catalog or MySQL 
Catalog.
+The section mainly describes how to create and use a JDBC Catalog.
 Please refer to [Dependencies](#dependencies) section for how to setup a JDBC 
connector and the corresponding driver.
 
 The JDBC catalog supports the following options:
 - `name`: required, name of the catalog.
 - `default-database`: required, default database to connect to.
-- `username`: required, username of Postgres/MySQL account.
+- `username`: required, username of database account.
 - `password`: required, password of the account.
 - `base-url`: required (should not contain the database name)
   - for Postgres Catalog this should be `"jdbc:postgresql://<ip>:<port>"`
   - for MySQL Catalog this should be `"jdbc:mysql://<ip>:<port>"`
+  - for OceanBase Catalog this should be `jdbc:oceanbase://<ip>:<port>`
+- `compatible-mode`: optional, the compatible mode of database.
 
 {{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}}
 {{< tab "SQL" >}}
@@ -654,6 +655,37 @@ SELECT * FROM crate.`custom_schema.test_table2`;
 SELECT * FROM `custom_schema.test_table2`;
 ```
 
+### JDBC Catalog for OceanBase
+
+#### OceanBase Metaspace Mapping
+
+OceanBase database supports multiple tenant management, and each tenant can 
work at MySQL compatible mode or Oracle compatible mode. On MySQL mode of 
OceanBase, there are databases and tables but no schema in one tenant, these 
objects just like databases and tables in the MySQL database. On Oracle mode of 
OceanBase, there are schemas and tables but no database in one tenant, these 
objects just like schemas and tables in the Oracle database.
+
+In Flink, when querying tables registered by OceanBase Catalog, users can use 
either `database.table_name` or just `table_name` on OceanBase MySQL mode, or 
use either `schema.table_name` or just `table_name` on OceanBase Oracle mode.
+
+Therefore, the metaspace mapping between Flink Catalog and OceanBase is as 
following:
+
+| Flink Catalog Metaspace Structure    | OceanBase Metaspace Structure (MySQL 
Mode) | OceanBase Metaspace Structure (Oracle Mode) |
+|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
+| catalog name (defined in Flink only) | N/A                                   
     | N/A                                         |
+| database name                        | database name                         
     | schema name                                 |
+| table name                           | table name                            
     | table name                                  |
+
+The full path of OceanBase table in Flink should be "`<catalog>`.`<db or 
schema>`.`<table>`".
+
+Here are some examples to access OceanBase tables:
+
+```sql
+-- scan table 'test_table', the default database or schema is 'mydb'.
+SELECT * FROM oceanbase_catalog.mydb.test_table;
+SELECT * FROM mydb.test_table;
+SELECT * FROM test_table;
+
+-- scan table 'test_table' with the given database or schema.
+SELECT * FROM oceanbase_catalog.given_database.test_table2;
+SELECT * FROM given_database.test_table2;
+```
+
 Data Type Mapping
 ----------------
 Flink supports connect to several databases which uses dialect like MySQL, 
Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby 
dialect usually used for testing purpose. The field data type mappings from 
relational databases data types to Flink SQL data types are listed in the 
following table, the mapping table can help define JDBC table in Flink easily.
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
index 7f859145..73424598 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.jdbc.core.database.catalog;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.jdbc.core.table.JdbcDynamicTableFactory;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
@@ -87,6 +88,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Abstract catalog for any JDBC catalogs. */
+@PublicEvolving
 public abstract class AbstractJdbcCatalog extends AbstractCatalog implements 
JdbcCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
@@ -127,7 +129,7 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
 
         this.userClassLoader = userClassLoader;
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
-        this.defaultUrl = this.baseUrl + defaultDatabase;
+        this.defaultUrl = getDatabaseUrl(defaultDatabase);
         this.connectionProperties = 
Preconditions.checkNotNull(connectionProperties);
         checkArgument(
                 
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
@@ -136,6 +138,10 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
                         connectionProperties.getProperty(PASSWORD_KEY)));
     }
 
+    protected String getDatabaseUrl(String databaseName) {
+        return baseUrl + databaseName;
+    }
+
     @Override
     public void open() throws CatalogException {
         // load the Driver use userClassLoader explicitly, see FLINK-15635 for 
more detail
@@ -266,9 +272,9 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
         }
 
         String databaseName = tablePath.getDatabaseName();
-        String dbUrl = baseUrl + databaseName;
 
-        try (Connection conn = DriverManager.getConnection(dbUrl, 
connectionProperties)) {
+        try (Connection conn =
+                DriverManager.getConnection(getDatabaseUrl(databaseName), 
connectionProperties)) {
             DatabaseMetaData metaData = conn.getMetaData();
             Optional<UniqueConstraint> primaryKey =
                     getPrimaryKey(
@@ -299,19 +305,23 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
                     pk -> schemaBuilder.primaryKeyNamed(pk.getName(), 
pk.getColumns()));
             Schema tableSchema = schemaBuilder.build();
 
-            Map<String, String> props = new HashMap<>();
-            props.put(CONNECTOR.key(), IDENTIFIER);
-            props.put(URL.key(), dbUrl);
-            props.put(USERNAME.key(), 
connectionProperties.getProperty(USER_KEY));
-            props.put(PASSWORD.key(), 
connectionProperties.getProperty(PASSWORD_KEY));
-            props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
-            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), 
props);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), 
getOptions(tablePath));
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed getting table %s", 
tablePath.getFullName()), e);
         }
     }
 
+    protected Map<String, String> getOptions(ObjectPath tablePath) {
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR.key(), IDENTIFIER);
+        props.put(URL.key(), getDatabaseUrl(tablePath.getDatabaseName()));
+        props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
+        props.put(PASSWORD.key(), 
connectionProperties.getProperty(PASSWORD_KEY));
+        props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
+        return props;
+    }
+
     @Override
     public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
diff --git 
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
 
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
index a28f3de7..288ac0e7 100644
--- 
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
+++ 
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java
@@ -134,7 +134,8 @@ public class CrateDBCatalog extends PostgresCatalog {
         }
 
         String searchPath =
-                extractColumnValuesBySQL(baseUrl + DEFAULT_DATABASE, "show 
search_path", 1, null)
+                extractColumnValuesBySQL(
+                                getDatabaseUrl(DEFAULT_DATABASE), "show 
search_path", 1, null)
                         .get(0);
         String[] schemas = searchPath.split("\\s*,\\s*");
 
diff --git 
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
 
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
index efd16138..27bdf93e 100644
--- 
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
+++ 
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
@@ -118,7 +118,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         }
 
         return extractColumnValuesBySQL(
-                baseUrl + databaseName,
+                getDatabaseUrl(databaseName),
                 "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE 
TABLE_SCHEMA = ?",
                 1,
                 null,
diff --git 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java
 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java
index 0ead1184..d5ab4838 100644
--- 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java
+++ 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
 import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+import 
org.apache.flink.connector.jdbc.oceanbase.database.catalog.OceanBaseCatalog;
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
 import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseDialect;
 
-import javax.annotation.Nonnull;
-
 /** Factory for {@link OceanBaseDialect}. */
 @Internal
 public class OceanBaseFactory implements JdbcFactory {
@@ -37,13 +37,12 @@ public class OceanBaseFactory implements JdbcFactory {
 
     @Override
     public JdbcDialect createDialect() {
-        throw new UnsupportedOperationException(
-                "Can't create JdbcDialect without compatible mode for 
OceanBase");
+        return createDialect(null);
     }
 
     @Override
-    public JdbcDialect createDialect(@Nonnull String compatibleMode) {
-        return new OceanBaseDialect(compatibleMode);
+    public JdbcDialect createDialect(String compatibleMode) {
+        return new 
OceanBaseDialect(OceanBaseCompatibleMode.parse(compatibleMode));
     }
 
     @Override
@@ -54,6 +53,26 @@ public class OceanBaseFactory implements JdbcFactory {
             String username,
             String pwd,
             String baseUrl) {
-        throw new UnsupportedOperationException("Catalog for OceanBase is not 
supported yet.");
+        return createCatalog(
+                classLoader, catalogName, defaultDatabase, username, pwd, 
baseUrl, null);
+    }
+
+    @Override
+    public JdbcCatalog createCatalog(
+            ClassLoader classLoader,
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl,
+            String compatibleMode) {
+        return new OceanBaseCatalog(
+                classLoader,
+                catalogName,
+                OceanBaseCompatibleMode.parse(compatibleMode),
+                defaultDatabase,
+                username,
+                pwd,
+                baseUrl);
     }
 }
diff --git 
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalog.java
similarity index 55%
copy from 
flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
copy to 
flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalog.java
index efd16138..384a2c6d 100644
--- 
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java
+++ 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalog.java
@@ -16,58 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.mysql.database.catalog;
+package org.apache.flink.connector.jdbc.oceanbase.database.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog;
 import 
org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper;
+import org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions;
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
+import java.sql.DatabaseMetaData;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import static 
org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;
 
-/** Catalog for MySQL. */
+/** Catalog for OceanBase. */
 @Internal
-public class MySqlCatalog extends AbstractJdbcCatalog {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlCatalog.class);
-
-    private final JdbcCatalogTypeMapper dialectTypeMapper;
+public class OceanBaseCatalog extends AbstractJdbcCatalog {
 
     private static final Set<String> builtinDatabases =
             new HashSet<String>() {
                 {
+                    add("__public");
                     add("information_schema");
                     add("mysql");
-                    add("performance_schema");
-                    add("sys");
+                    add("oceanbase");
+                    add("LBACSYS");
+                    add("ORAAUDITOR");
                 }
             };
 
-    @VisibleForTesting
-    public MySqlCatalog(
+    private final OceanBaseCompatibleMode compatibleMode;
+    private final JdbcCatalogTypeMapper dialectTypeMapper;
+
+    public OceanBaseCatalog(
             ClassLoader userClassLoader,
             String catalogName,
+            OceanBaseCompatibleMode compatibleMode,
             String defaultDatabase,
             String username,
             String pwd,
@@ -75,39 +74,34 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         this(
                 userClassLoader,
                 catalogName,
+                compatibleMode,
                 defaultDatabase,
                 baseUrl,
                 getBriefAuthProperties(username, pwd));
     }
 
-    public MySqlCatalog(
+    public OceanBaseCatalog(
             ClassLoader userClassLoader,
             String catalogName,
+            OceanBaseCompatibleMode compatibleMode,
             String defaultDatabase,
             String baseUrl,
             Properties connectionProperties) {
         super(userClassLoader, catalogName, defaultDatabase, baseUrl, 
connectionProperties);
-
-        String driverVersion =
-                Preconditions.checkNotNull(getDriverVersion(), "Driver version 
must not be null.");
-        String databaseVersion =
-                Preconditions.checkNotNull(
-                        getDatabaseVersion(), "Database version must not be 
null.");
-        LOG.info("Driver version: {}, database version: {}", driverVersion, 
databaseVersion);
-        this.dialectTypeMapper = new MySqlTypeMapper(databaseVersion, 
driverVersion);
+        this.compatibleMode = compatibleMode;
+        this.dialectTypeMapper = new OceanBaseTypeMapper(compatibleMode);
     }
 
     @Override
     public List<String> listDatabases() throws CatalogException {
+        String query =
+                compatibleMode.isMySQLMode()
+                        ? "SELECT `SCHEMA_NAME` FROM 
`INFORMATION_SCHEMA`.`SCHEMATA`"
+                        : "SELECT USERNAME FROM ALL_USERS";
         return extractColumnValuesBySQL(
-                defaultUrl,
-                "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
-                1,
-                dbName -> !builtinDatabases.contains(dbName));
+                defaultUrl, query, 1, dbName -> 
!builtinDatabases.contains(dbName));
     }
 
-    // ------ tables ------
-
     @Override
     public List<String> listTables(String databaseName)
             throws DatabaseNotExistException, CatalogException {
@@ -116,21 +110,24 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         if (!databaseExists(databaseName)) {
             throw new DatabaseNotExistException(getName(), databaseName);
         }
-
-        return extractColumnValuesBySQL(
-                baseUrl + databaseName,
-                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE 
TABLE_SCHEMA = ?",
-                1,
-                null,
-                databaseName);
+        String sql =
+                compatibleMode.isMySQLMode()
+                        ? "SELECT TABLE_NAME FROM information_schema.`TABLES` 
WHERE TABLE_SCHEMA = ?"
+                        : "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?";
+        return extractColumnValuesBySQL(defaultUrl, sql, 1, null, 
databaseName);
     }
 
     @Override
     public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        String query =
+                compatibleMode.isMySQLMode()
+                        ? "SELECT TABLE_NAME FROM information_schema.`TABLES` "
+                                + "WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?"
+                        : "SELECT TABLE_NAME FROM ALL_TABLES "
+                                + "WHERE OWNER = ? and TABLE_NAME = ?";
         return !extractColumnValuesBySQL(
-                        baseUrl,
-                        "SELECT TABLE_NAME FROM information_schema.`TABLES` "
-                                + "WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        defaultUrl,
+                        query,
                         1,
                         null,
                         tablePath.getDatabaseName(),
@@ -138,35 +135,25 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
                 .isEmpty();
     }
 
-    private String getDatabaseVersion() {
-        try (TemporaryClassLoaderContext ignored =
-                TemporaryClassLoaderContext.of(userClassLoader)) {
-            try (Connection conn = DriverManager.getConnection(defaultUrl, 
connectionProperties)) {
-                return conn.getMetaData().getDatabaseProductVersion();
-            } catch (Exception e) {
-                throw new CatalogException(
-                        String.format("Failed in getting MySQL version by 
%s.", defaultUrl), e);
-            }
+    @Override
+    protected Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String database, String schema, String 
table)
+            throws SQLException {
+        if (compatibleMode.isMySQLMode()) {
+            return super.getPrimaryKey(metaData, database, null, table);
+        } else {
+            return super.getPrimaryKey(metaData, null, database, table);
         }
     }
 
-    private String getDriverVersion() {
-        try (TemporaryClassLoaderContext ignored =
-                TemporaryClassLoaderContext.of(userClassLoader)) {
-            try (Connection conn = DriverManager.getConnection(defaultUrl, 
connectionProperties)) {
-                String driverVersion = conn.getMetaData().getDriverVersion();
-                Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
-                Matcher matcher = regexp.matcher(driverVersion);
-                return matcher.find() ? matcher.group(0) : null;
-            } catch (Exception e) {
-                throw new CatalogException(
-                        String.format("Failed in getting MySQL driver version 
by %s.", defaultUrl),
-                        e);
-            }
-        }
+    @Override
+    protected Map<String, String> getOptions(ObjectPath tablePath) {
+        Map<String, String> options = super.getOptions(tablePath);
+        options.put(JdbcConnectorOptions.COMPATIBLE_MODE.key(), 
compatibleMode.toString());
+        return options;
     }
 
-    /** Converts MySQL type to Flink {@link DataType}. */
+    /** Converts OceanBase type to Flink {@link DataType}. */
     @Override
     protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData 
metadata, int colIndex)
             throws SQLException {
@@ -180,7 +167,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected String getSchemaName(ObjectPath tablePath) {
-        return tablePath.getDatabaseName();
+        return null;
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseTypeMapper.java
 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseTypeMapper.java
new file mode 100644
index 00000000..7703e72a
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseTypeMapper.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database.catalog;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper;
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+
+/** OceanBaseTypeMapper util class. */
+@Internal
+public class OceanBaseTypeMapper implements JdbcCatalogTypeMapper {
+
+    private static final int RAW_TIME_LENGTH = 10;
+    private static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final int TYPE_BINARY_FLOAT = 100;
+    private static final int TYPE_BINARY_DOUBLE = 101;
+
+    private final OceanBaseCompatibleMode compatibleMode;
+
+    public OceanBaseTypeMapper(OceanBaseCompatibleMode compatibleMode) {
+        this.compatibleMode = compatibleMode;
+    }
+
+    @Override
+    public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, 
int colIndex)
+            throws SQLException {
+        String typeName = metadata.getColumnTypeName(colIndex).toUpperCase();
+        int jdbcType = metadata.getColumnType(colIndex);
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        switch (jdbcType) {
+            case Types.BIT:
+                return DataTypes.BOOLEAN();
+            case Types.TINYINT:
+                return isUnsignedType(typeName) || precision > 4
+                        ? DataTypes.SMALLINT()
+                        : DataTypes.TINYINT();
+            case Types.SMALLINT:
+                return isUnsignedType(typeName) ? DataTypes.INT() : 
DataTypes.SMALLINT();
+            case Types.INTEGER:
+                return !typeName.toUpperCase().startsWith("MEDIUMINT") && 
isUnsignedType(typeName)
+                        ? DataTypes.BIGINT()
+                        : DataTypes.INT();
+            case Types.BIGINT:
+                return isUnsignedType(typeName) ? DataTypes.DECIMAL(20, 0) : 
DataTypes.BIGINT();
+            case Types.FLOAT:
+            case Types.NUMERIC:
+            case Types.DECIMAL:
+                if (compatibleMode.isMySQLMode()) {
+                    return isUnsignedType(typeName)
+                            ? getDecimalType(precision + 1, scale)
+                            : getDecimalType(precision, scale);
+                }
+                return getNumericType(precision, scale);
+            case Types.REAL:
+            case TYPE_BINARY_FLOAT:
+                return DataTypes.FLOAT();
+            case Types.DOUBLE:
+            case TYPE_BINARY_DOUBLE:
+                return DataTypes.DOUBLE();
+            case Types.DATE:
+                return "YEAR".equals(typeName) ? DataTypes.INT() : 
DataTypes.DATE();
+            case Types.TIME:
+                return isExplicitPrecision(precision, RAW_TIME_LENGTH)
+                        ? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1)
+                        : DataTypes.TIME(0);
+            case Types.TIMESTAMP:
+                return typeName.equalsIgnoreCase("DATE")
+                        ? DataTypes.DATE()
+                        : isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH)
+                                ? DataTypes.TIMESTAMP(precision - 
RAW_TIMESTAMP_LENGTH - 1)
+                                : DataTypes.TIMESTAMP(0);
+            case Types.CHAR:
+            case Types.NCHAR:
+                return DataTypes.CHAR(precision);
+            case Types.VARCHAR:
+            case Types.NVARCHAR:
+            case Types.LONGVARCHAR:
+                return precision > 0 ? DataTypes.VARCHAR(precision) : 
DataTypes.STRING();
+            case Types.CLOB:
+                return DataTypes.STRING();
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                return DataTypes.BYTES();
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support type '%s' on column '%s'.", 
typeName, columnName));
+        }
+    }
+
+    private DataType getNumericType(int precision, int scale) {
+        if (precision == 0) {
+            return DataTypes.STRING();
+        }
+        if (scale <= 0) {
+            int width = precision - scale;
+            if (width < 3) {
+                return DataTypes.TINYINT();
+            } else if (width < 5) {
+                return DataTypes.SMALLINT();
+            } else if (width < 10) {
+                return DataTypes.INT();
+            } else if (width < 19) {
+                return DataTypes.BIGINT();
+            }
+        }
+        return getDecimalType(precision, scale);
+    }
+
+    private DataType getDecimalType(int precision, int scale) {
+        if (precision >= DecimalType.MAX_PRECISION || precision == 0) {
+            return DataTypes.STRING();
+        }
+        return DataTypes.DECIMAL(precision, scale);
+    }
+
+    private boolean isUnsignedType(String typeName) {
+        return typeName.toUpperCase().contains("UNSIGNED");
+    }
+
+    private boolean isExplicitPrecision(int precision, int defaultPrecision) {
+        return precision > defaultPrecision
+                && (precision - defaultPrecision - 1 <= 
(compatibleMode.isMySQLMode() ? 6 : 9));
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseCompatibleMode.java
 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseCompatibleMode.java
new file mode 100644
index 00000000..e352c6f3
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseCompatibleMode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database.dialect;
+
+/** Compatible mode of OceanBase. */
+public enum OceanBaseCompatibleMode {
+    MySQL,
+    Oracle;
+
+    public static OceanBaseCompatibleMode parse(String text) {
+        if (text == null || text.trim().isEmpty()) {
+            return OceanBaseCompatibleMode.MySQL;
+        }
+        switch (text.trim().toLowerCase()) {
+            case "mysql":
+                return OceanBaseCompatibleMode.MySQL;
+            case "oracle":
+                return OceanBaseCompatibleMode.Oracle;
+            default:
+                throw new IllegalArgumentException("Unsupported compatible 
mode: " + text);
+        }
+    }
+
+    public boolean isMySQLMode() {
+        return this == MySQL;
+    }
+
+    @Override
+    public String toString() {
+        return this.name().toLowerCase();
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialect.java
 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialect.java
index 41914b47..8b8b591b 100644
--- 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialect.java
+++ 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialect.java
@@ -26,8 +26,6 @@ import 
org.apache.flink.connector.jdbc.oracle.database.dialect.OracleDialect;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
-import javax.annotation.Nonnull;
-
 import java.util.EnumSet;
 import java.util.Optional;
 import java.util.Set;
@@ -40,17 +38,11 @@ public class OceanBaseDialect extends AbstractDialect {
 
     private final AbstractDialect dialect;
 
-    public OceanBaseDialect(@Nonnull String compatibleMode) {
-        switch (compatibleMode.toLowerCase()) {
-            case "mysql":
-                this.dialect = new MySqlDialect();
-                break;
-            case "oracle":
-                this.dialect = new OracleDialect();
-                break;
-            default:
-                throw new IllegalArgumentException(
-                        "Unsupported compatible mode: " + compatibleMode);
+    public OceanBaseDialect(OceanBaseCompatibleMode compatibleMode) {
+        if (compatibleMode.isMySQLMode()) {
+            this.dialect = new MySqlDialect();
+        } else {
+            this.dialect = new OracleDialect();
         }
     }
 
diff --git 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectConverter.java
 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectConverter.java
index 69bf6e3d..b2bec919 100644
--- 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectConverter.java
+++ 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectConverter.java
@@ -69,7 +69,10 @@ public class OceanBaseDialectConverter extends 
AbstractDialectConverter {
             case SMALLINT:
                 return val -> val instanceof Number ? ((Number) 
val).shortValue() : val;
             case INTEGER:
-                return val -> val instanceof Number ? ((Number) 
val).intValue() : val;
+                return val ->
+                        val instanceof Number
+                                ? ((Number) val).intValue()
+                                : val instanceof Date ? ((Date) 
val).toLocalDate().getYear() : val;
             case BIGINT:
                 return val -> val instanceof Number ? ((Number) 
val).longValue() : val;
             case DECIMAL:
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseMysqlTestBase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseMysqlTestBase.java
index ae99523e..9a1b2dcd 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseMysqlTestBase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseMysqlTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.jdbc.oceanbase;
 
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
 import org.apache.flink.connector.jdbc.oceanbase.table.OceanBaseTableRow;
 import org.apache.flink.connector.jdbc.oceanbase.testutils.OceanBaseDatabase;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
@@ -31,8 +32,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(OceanBaseDatabase.class)
 public interface OceanBaseMysqlTestBase extends DatabaseTest {
 
-    default TableRow tableRow(String name, TableField... fields) {
-        return new OceanBaseTableRow("mysql", name, fields);
+    static TableRow tableRow(String name, TableField... fields) {
+        return new OceanBaseTableRow(OceanBaseCompatibleMode.MySQL, name, 
fields);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseOracleTestBase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseOracleTestBase.java
index 7aeb71e0..3ac413d0 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseOracleTestBase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/OceanBaseOracleTestBase.java
@@ -18,25 +18,28 @@
 
 package org.apache.flink.connector.jdbc.oceanbase;
 
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
 import org.apache.flink.connector.jdbc.oceanbase.table.OceanBaseTableRow;
-import org.apache.flink.connector.jdbc.oceanbase.testutils.OceanBaseDatabase;
+import org.apache.flink.connector.jdbc.oceanbase.testutils.OceanBaseMetadata;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
 import org.apache.flink.connector.jdbc.testutils.tables.TableField;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 
-import org.junit.jupiter.api.extension.ExtendWith;
-
 /** Base class for OceanBase Oracle mode testing. */
-@ExtendWith(OceanBaseDatabase.class)
 public interface OceanBaseOracleTestBase extends DatabaseTest {
 
-    default TableRow tableRow(String name, TableField... fields) {
-        return new OceanBaseTableRow("oracle", name, fields);
+    static TableRow tableRow(String name, TableField... fields) {
+        return new OceanBaseTableRow(OceanBaseCompatibleMode.Oracle, name, 
fields);
     }
 
     @Override
     default DatabaseMetadata getMetadata() {
-        return OceanBaseDatabase.getMetadata();
+        return new OceanBaseMetadata(
+                System.getenv("test.oceanbase.username"),
+                System.getenv("test.oceanbase.password"),
+                System.getenv("test.oceanbase.url"),
+                "com.oceanbase.jdbc.Driver",
+                "test");
     }
 }
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalogITCaseBase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalogITCaseBase.java
new file mode 100644
index 00000000..c0bc6b76
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalogITCaseBase.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database.catalog;
+
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test base for {@link OceanBaseCatalog}. */
+public abstract class OceanBaseCatalogITCaseBase implements JdbcITCaseBase, 
DatabaseTest {
+
+    private final String catalogName;
+    private final OceanBaseCompatibleMode compatibleMode;
+    private final String defaultDatabase;
+
+    public OceanBaseCatalogITCaseBase(
+            String catalogName, OceanBaseCompatibleMode compatibleMode, String 
defaultDatabase) {
+        this.catalogName = catalogName;
+        this.compatibleMode = compatibleMode;
+        this.defaultDatabase = defaultDatabase;
+    }
+
+    protected OceanBaseCatalog catalog;
+    protected TableEnvironment tEnv;
+
+    protected abstract TableRow allTypesSourceTable();
+
+    protected abstract TableRow allTypesSinkTable();
+
+    protected abstract List<Row> allTypesTableRows();
+
+    protected void before() {}
+
+    protected void after() {}
+
+    @BeforeEach
+    void setup() {
+        before();
+
+        try (Connection conn = getMetadata().getConnection()) {
+            allTypesSourceTable().insertIntoTableValues(conn, 
allTypesTableRows());
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+
+        catalog =
+                new OceanBaseCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        catalogName,
+                        compatibleMode,
+                        defaultDatabase,
+                        getMetadata()
+                                .getJdbcUrl()
+                                .substring(0, 
getMetadata().getJdbcUrl().lastIndexOf("/")),
+                        getBriefAuthProperties(
+                                getMetadata().getUsername(), 
getMetadata().getPassword()));
+
+        tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tEnv.registerCatalog(catalogName, catalog);
+        tEnv.useCatalog(catalogName);
+    }
+
+    @AfterEach
+    void cleanup() {
+        after();
+    }
+
+    @Test
+    void testGetDb_DatabaseNotExistException() {
+        String databaseNotExist = "nonexistent";
+        assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist))
+                .satisfies(
+                        anyCauseMatches(
+                                DatabaseNotExistException.class,
+                                String.format(
+                                        "Database %s does not exist in 
Catalog",
+                                        databaseNotExist)));
+    }
+
+    @Test
+    void testDbExists() {
+        String databaseNotExist = "nonexistent";
+        assertThat(catalog.databaseExists(databaseNotExist)).isFalse();
+        assertThat(catalog.databaseExists(defaultDatabase)).isTrue();
+    }
+
+    // ------ tables ------
+
+    @Test
+    void testListTables() throws DatabaseNotExistException {
+        List<String> actual = catalog.listTables(defaultDatabase);
+        assertThat(actual)
+                .containsAll(
+                        getManagedTables().stream()
+                                .map(TableManaged::getTableName)
+                                .collect(Collectors.toList()));
+    }
+
+    @Test
+    void testListTables_DatabaseNotExistException() {
+        String anyDatabase = "anyDatabase";
+        assertThatThrownBy(() -> catalog.listTables(anyDatabase))
+                .satisfies(
+                        anyCauseMatches(
+                                DatabaseNotExistException.class,
+                                String.format(
+                                        "Database %s does not exist in 
Catalog", anyDatabase)));
+    }
+
+    @Test
+    void testTableExists() {
+        String tableNotExist = "nonexist";
+        assertThat(catalog.tableExists(new ObjectPath(defaultDatabase, 
tableNotExist))).isFalse();
+        assertThat(
+                        catalog.tableExists(
+                                new ObjectPath(
+                                        defaultDatabase, 
allTypesSourceTable().getTableName())))
+                .isTrue();
+    }
+
+    @Test
+    void testGetTables_TableNotExistException() {
+        String anyTableNotExist = "anyTable";
+        assertThatThrownBy(
+                        () -> catalog.getTable(new ObjectPath(defaultDatabase, 
anyTableNotExist)))
+                .satisfies(
+                        anyCauseMatches(
+                                TableNotExistException.class,
+                                String.format(
+                                        "Table (or view) %s.%s does not exist 
in Catalog",
+                                        defaultDatabase, anyTableNotExist)));
+    }
+
+    @Test
+    void testGetTables_TableNotExistException_NoDb() {
+        String databaseNotExist = "nonexistdb";
+        String tableNotExist = "anyTable";
+        assertThatThrownBy(() -> catalog.getTable(new 
ObjectPath(databaseNotExist, tableNotExist)))
+                .satisfies(
+                        anyCauseMatches(
+                                TableNotExistException.class,
+                                String.format(
+                                        "Table (or view) %s.%s does not exist 
in Catalog",
+                                        databaseNotExist, tableNotExist)));
+    }
+
+    @Test
+    void testGetTable() throws TableNotExistException {
+        CatalogBaseTable table =
+                catalog.getTable(
+                        new ObjectPath(defaultDatabase, 
allTypesSourceTable().getTableName()));
+        assertThat(table.getUnresolvedSchema().getColumns())
+                
.isEqualTo(allTypesSourceTable().getTableSchema().getColumns());
+    }
+
+    // ------ test select query. ------
+
+    @Test
+    void testWithoutCatalogDB() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from %s",
+                                                
allTypesSourceTable().getTableName()))
+                                .execute()
+                                .collect());
+
+        assertThat(results).isEqualTo(allTypesTableRows());
+    }
+
+    @Test
+    void testWithoutCatalog() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from `%s`.`%s`",
+                                                defaultDatabase,
+                                                
allTypesSourceTable().getTableName()))
+                                .execute()
+                                .collect());
+        assertThat(results).isEqualTo(allTypesTableRows());
+    }
+
+    @Test
+    void testFullPath() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from %s.%s.`%s`",
+                                                catalogName,
+                                                defaultDatabase,
+                                                
allTypesSourceTable().getTableName()))
+                                .execute()
+                                .collect());
+        assertThat(results).isEqualTo(allTypesTableRows());
+    }
+
+    @Test
+    void testSelectToInsert() throws Exception {
+        String sql =
+                String.format(
+                        "insert into `%s` select * from `%s`",
+                        allTypesSinkTable().getTableName(), 
allTypesSourceTable().getTableName());
+        tEnv.executeSql(sql).await();
+
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from %s",
+                                                
allTypesSinkTable().getTableName()))
+                                .execute()
+                                .collect());
+        assertThat(results).isEqualTo(allTypesTableRows());
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseMysqlCatalogITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseMysqlCatalogITCase.java
new file mode 100644
index 00000000..7231521a
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseMysqlCatalogITCase.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database.catalog;
+
+import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase;
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.TableBuilder;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase.tableRow;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** E2E test for {@link OceanBaseCatalog} with OceanBase MySql mode. */
+public class OceanBaseMysqlCatalogITCase extends OceanBaseCatalogITCaseBase
+        implements OceanBaseMysqlTestBase {
+
+    private static final String DEFAULT_DB = "test";
+    private static final String TEST_DB = "catalog_test";
+
+    private static final TableRow TABLE_ALL_TYPES = 
createTableAllTypeTable("t_all_types");
+    private static final TableRow TABLE_ALL_TYPES_SINK =
+            createTableAllTypeTable("t_all_types_sink");
+    private static final TableRow TABLE_GROUPED_BY_SINK = 
createGroupedTable("t_grouped_by_sink");
+    private static final TableRow TABLE_PK = createGroupedTable("t_pk");
+    private static final TableRow TABLE_PK2 =
+            TableBuilder.tableRow(
+                    "t_pk",
+                    pkField(
+                            "pid",
+                            dbType("int(11) NOT NULL AUTO_INCREMENT"),
+                            DataTypes.BIGINT().notNull()),
+                    field("col_varchar", dbType("varchar(255)"), 
DataTypes.BIGINT()));
+
+    private static final List<Row> TABLE_ALL_TYPES_ROWS =
+            Arrays.asList(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            1L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            2024,
+                            LocalDateTime.parse("2021-08-04T01:54:16"),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            LocalDateTime.parse("2021-08-04T01:54:16"),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Short.parseShort("1"),
+                            null,
+                            "col_varchar",
+                            LocalDateTime.parse("2021-08-04T01:54:16.463"),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            LocalDateTime.parse("2021-08-04T01:54:16.463"),
+                            null),
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            2L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            2024,
+                            LocalDateTime.parse("2021-08-04T01:53:19"),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1,set_ele12",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            LocalDateTime.parse("2021-08-04T01:53:19"),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Short.parseShort("1"),
+                            null,
+                            "col_varchar",
+                            LocalDateTime.parse("2021-08-04T01:53:19.098"),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            LocalDateTime.parse("2021-08-04T01:53:19.098"),
+                            null));
+
+    private static TableRow createTableAllTypeTable(String tableName) {
+        return tableRow(
+                tableName,
+                pkField(
+                        "pid",
+                        dbType("bigint(20) NOT NULL AUTO_INCREMENT"),
+                        DataTypes.BIGINT().notNull()),
+                field("col_bigint", dbType("bigint(20)"), DataTypes.BIGINT()),
+                field(
+                        "col_bigint_unsigned",
+                        dbType("bigint(20) unsigned"),
+                        DataTypes.DECIMAL(20, 0)),
+                field("col_binary", dbType("binary(100)"), DataTypes.BYTES()),
+                field("col_bit", dbType("bit(1)"), DataTypes.BOOLEAN()),
+                field("col_blob", dbType("blob"), DataTypes.BYTES()),
+                field("col_char", dbType("char(10)"), DataTypes.CHAR(10)),
+                field("col_date", dbType("date"), DataTypes.DATE()),
+                field("col_year", dbType("year"), DataTypes.INT()),
+                field(
+                        "col_datetime",
+                        dbType("datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP"),
+                        DataTypes.TIMESTAMP(0)),
+                field("col_decimal", dbType("decimal(10,0)"), 
DataTypes.DECIMAL(10, 0)),
+                field(
+                        "col_decimal_unsigned",
+                        dbType("decimal(10,0) unsigned"),
+                        DataTypes.DECIMAL(11, 0)),
+                field("col_double", dbType("double"), DataTypes.DOUBLE()),
+                field("col_double_unsigned", dbType("double unsigned"), 
DataTypes.DOUBLE()),
+                field("col_enum", dbType("enum('enum1','enum2','enum11')"), 
DataTypes.VARCHAR(6)),
+                field("col_float", dbType("float"), DataTypes.FLOAT()),
+                field("col_float_unsigned", dbType("float unsigned"), 
DataTypes.FLOAT()),
+                field("col_int", dbType("int(11)"), DataTypes.INT()),
+                field("col_int_unsigned", dbType("int(10) unsigned"), 
DataTypes.BIGINT()),
+                field("col_integer", dbType("int(11)"), DataTypes.INT()),
+                field("col_integer_unsigned", dbType("int(10) unsigned"), 
DataTypes.BIGINT()),
+                field("col_longblob", dbType("longblob"), DataTypes.BYTES()),
+                field(
+                        "col_longtext",
+                        dbType("longtext CHARACTER SET utf8mb4 COLLATE 
utf8mb4_bin"),
+                        DataTypes.STRING()),
+                field("col_mediumblob", dbType("mediumblob"), 
DataTypes.BYTES()),
+                field("col_mediumint", dbType("mediumint(9)"), 
DataTypes.INT()),
+                field("col_mediumint_unsigned", dbType("mediumint(8) 
unsigned"), DataTypes.INT()),
+                field("col_mediumtext", dbType("mediumtext"), 
DataTypes.VARCHAR(16777215)),
+                field("col_numeric", dbType("decimal(10,0)"), 
DataTypes.DECIMAL(10, 0)),
+                field(
+                        "col_numeric_unsigned",
+                        dbType("decimal(10,0) unsigned"),
+                        DataTypes.DECIMAL(11, 0)),
+                field("col_real", dbType("double"), DataTypes.DOUBLE()),
+                field("col_real_unsigned", dbType("double unsigned"), 
DataTypes.DOUBLE()),
+                field("col_set", dbType("set('set_ele1','set_ele12')"), 
DataTypes.VARCHAR(18)),
+                field("col_smallint", dbType("smallint(6)"), 
DataTypes.SMALLINT()),
+                field("col_smallint_unsigned", dbType("smallint(5) unsigned"), 
DataTypes.INT()),
+                field("col_text", dbType("text"), DataTypes.VARCHAR(65535)),
+                field("col_time", dbType("time"), DataTypes.TIME(0)),
+                field(
+                        "col_timestamp",
+                        dbType(
+                                "timestamp NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP"),
+                        DataTypes.TIMESTAMP(0)),
+                field("col_tinytext", dbType("tinytext"), 
DataTypes.VARCHAR(255)),
+                field("col_tinyint", dbType("tinyint"), DataTypes.TINYINT()),
+                field(
+                        "col_tinyint_unsigned",
+                        dbType("tinyint(255) unsigned"),
+                        DataTypes.SMALLINT()),
+                field("col_tinyblob", dbType("tinyblob"), DataTypes.BYTES()),
+                field("col_varchar", dbType("varchar(255)"), 
DataTypes.VARCHAR(255)),
+                field(
+                        "col_datetime_p3",
+                        dbType(
+                                "datetime(3) NOT NULL DEFAULT 
CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3)"),
+                        DataTypes.TIMESTAMP(3).notNull()),
+                field("col_time_p3", dbType("time(3)"), DataTypes.TIME(3)),
+                field(
+                        "col_timestamp_p3",
+                        dbType(
+                                "timestamp(3) NULL DEFAULT 
CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3)"),
+                        DataTypes.TIMESTAMP(3)),
+                field("col_varbinary", dbType("varbinary(255)"), 
DataTypes.BYTES()));
+    }
+
+    private static TableRow createGroupedTable(String tableName) {
+        return tableRow(
+                tableName,
+                pkField(
+                        "pid",
+                        dbType("bigint(20) NOT NULL AUTO_INCREMENT"),
+                        DataTypes.BIGINT().notNull()),
+                field("col_bigint", dbType("bigint(20)"), DataTypes.BIGINT()));
+    }
+
+    public OceanBaseMysqlCatalogITCase() {
+        super("oceanbase_mysql_catalog", OceanBaseCompatibleMode.MySQL, 
DEFAULT_DB);
+    }
+
+    @Override
+    protected TableRow allTypesSourceTable() {
+        return TABLE_ALL_TYPES;
+    }
+
+    @Override
+    protected TableRow allTypesSinkTable() {
+        return TABLE_ALL_TYPES_SINK;
+    }
+
+    @Override
+    protected List<Row> allTypesTableRows() {
+        return TABLE_ALL_TYPES_ROWS;
+    }
+
+    @Override
+    public List<TableManaged> getManagedTables() {
+        return Arrays.asList(
+                TABLE_ALL_TYPES, TABLE_ALL_TYPES_SINK, TABLE_GROUPED_BY_SINK, 
TABLE_PK);
+    }
+
+    @Override
+    protected void before() {
+        try (Connection conn = getMetadata().getConnection();
+                Statement st = conn.createStatement()) {
+            st.execute(String.format("CREATE DATABASE `%s` CHARSET=utf8", 
TEST_DB));
+            st.execute(String.format("use `%s`", TEST_DB));
+            st.execute(TABLE_PK2.getCreateQuery());
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void after() {
+        try (Connection conn = getMetadata().getConnection();
+                Statement st = conn.createStatement()) {
+            st.execute(String.format("DROP DATABASE IF EXISTS `%s`", TEST_DB));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    void testListDatabases() {
+        List<String> actual = catalog.listDatabases();
+        assertThat(actual).containsExactlyInAnyOrder(DEFAULT_DB, TEST_DB);
+    }
+
+    @Test
+    void testGetTablePrimaryKey() throws TableNotExistException {
+        // test the PK of test.t_user
+        Schema tableSchemaTestPK1 = TABLE_PK.getTableSchema();
+        CatalogBaseTable tablePK1 =
+                catalog.getTable(new ObjectPath(TEST_DB, 
TABLE_PK.getTableName()));
+        assertThat(tableSchemaTestPK1.getPrimaryKey())
+                .isEqualTo(tablePK1.getUnresolvedSchema().getPrimaryKey());
+
+        // test the PK of TEST_DB.t_user
+        Schema tableSchemaTestPK2 = TABLE_PK2.getTableSchema();
+        CatalogBaseTable tablePK2 =
+                catalog.getTable(new ObjectPath(TEST_DB, 
TABLE_PK2.getTableName()));
+        assertThat(tableSchemaTestPK2.getPrimaryKey())
+                .isEqualTo(tablePK2.getUnresolvedSchema().getPrimaryKey());
+    }
+
+    @Test
+    void testSelectField() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select pid from %s",
+                                                
TABLE_ALL_TYPES.getTableName()))
+                                .execute()
+                                .collect());
+        assertThat(results)
+                .isEqualTo(
+                        Arrays.asList(
+                                Row.ofKind(RowKind.INSERT, 1L), 
Row.ofKind(RowKind.INSERT, 2L)));
+    }
+
+    @Test
+    void testGroupByInsert() throws Exception {
+        // Changes primary key for the next record.
+        tEnv.executeSql(
+                        String.format(
+                                "insert into `%s` select max(`pid`) `pid`, 
`col_bigint` from `%s` "
+                                        + "group by `col_bigint` ",
+                                TABLE_GROUPED_BY_SINK.getTableName(),
+                                TABLE_ALL_TYPES.getTableName()))
+                .await();
+
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from `%s`",
+                                                
TABLE_GROUPED_BY_SINK.getTableName()))
+                                .execute()
+                                .collect());
+        assertThat(results)
+                
.isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, 2L, -1L)));
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseOracleCatalogITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseOracleCatalogITCase.java
new file mode 100644
index 00000000..f487c2d9
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseOracleCatalogITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database.catalog;
+
+import org.apache.flink.connector.jdbc.oceanbase.OceanBaseOracleTestBase;
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Disabled;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.connector.jdbc.oceanbase.OceanBaseOracleTestBase.tableRow;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+
+/** E2E test for {@link OceanBaseCatalog} with OceanBase Oracle mode. */
+@Disabled("OceanBase Oracle mode can only be tested locally.")
+public class OceanBaseOracleCatalogITCase extends OceanBaseCatalogITCaseBase
+        implements OceanBaseOracleTestBase {
+
+    private static final String SCHEMA = "SYS";
+
+    private static final TableRow TABLE_ALL_TYPES = 
createTableAllTypeTable("T_ALL_TYPES");
+    private static final TableRow TABLE_ALL_TYPES_SINK =
+            createTableAllTypeTable("T_ALL_TYPES_SINK");
+
+    private static final List<Row> TABLE_ALL_TYPES_ROWS =
+            Arrays.asList(
+                    Row.of(
+                            1L,
+                            BigDecimal.valueOf(100.1234),
+                            "1.12345",
+                            1.175E-10F,
+                            1.79769E+40D,
+                            "a",
+                            "abc",
+                            "abcdef",
+                            LocalDate.parse("1997-01-01"),
+                            LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+                            
LocalDateTime.parse("2020-01-01T15:35:00.123456789"),
+                            "Hello World"),
+                    Row.of(
+                            2L,
+                            BigDecimal.valueOf(101.1234),
+                            "1.12345",
+                            1.175E-10F,
+                            1.79769E+40D,
+                            "a",
+                            "abc",
+                            "abcdef",
+                            LocalDate.parse("1997-01-02"),
+                            LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+                            
LocalDateTime.parse("2020-01-01T15:36:01.123456789"),
+                            "Hey Leonard"));
+
+    private static TableRow createTableAllTypeTable(String tableName) {
+        return tableRow(
+                tableName,
+                pkField("ID", dbType("NUMBER(18,0)"), 
DataTypes.BIGINT().notNull()),
+                field("DECIMAL_COL", dbType("NUMBER(10,4)"), 
DataTypes.DECIMAL(10, 4)),
+                field("FLOAT_COL", dbType("FLOAT"), DataTypes.STRING()),
+                field("BINARY_FLOAT_COL", dbType("BINARY_FLOAT"), 
DataTypes.FLOAT()),
+                field("BINARY_DOUBLE_COL", dbType("BINARY_DOUBLE"), 
DataTypes.DOUBLE()),
+                field("CHAR_COL", dbType("CHAR"), DataTypes.CHAR(1)),
+                field("NCHAR_COL", dbType("NCHAR(3)"), DataTypes.CHAR(3)),
+                field("VARCHAR2_COL", dbType("VARCHAR2(30)"), 
DataTypes.VARCHAR(30)),
+                field("DATE_COL", dbType("DATE"), DataTypes.DATE()),
+                field("TIMESTAMP6_COL", dbType("TIMESTAMP"), 
DataTypes.TIMESTAMP(6)),
+                field("TIMESTAMP9_COL", dbType("TIMESTAMP(9)"), 
DataTypes.TIMESTAMP(9)),
+                field("CLOB_COL", dbType("CLOB"), DataTypes.STRING()));
+    }
+
+    public OceanBaseOracleCatalogITCase() {
+        super("oceanbase_oracle_catalog", OceanBaseCompatibleMode.Oracle, 
SCHEMA);
+    }
+
+    @Override
+    protected TableRow allTypesSourceTable() {
+        return TABLE_ALL_TYPES;
+    }
+
+    @Override
+    protected TableRow allTypesSinkTable() {
+        return TABLE_ALL_TYPES_SINK;
+    }
+
+    @Override
+    protected List<Row> allTypesTableRows() {
+        return TABLE_ALL_TYPES_ROWS;
+    }
+
+    @Override
+    public List<TableManaged> getManagedTables() {
+        return Arrays.asList(TABLE_ALL_TYPES, TABLE_ALL_TYPES_SINK);
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectTest.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectTest.java
index 07143d1a..6b1f17de 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectTest.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseDialectTest.java
@@ -27,7 +27,7 @@ class OceanBaseDialectTest {
 
     @Test
     void testMysqlAppendDefaultUrlProperties() {
-        OceanBaseDialect dialect = new OceanBaseDialect("mysql");
+        OceanBaseDialect dialect = new 
OceanBaseDialect(OceanBaseCompatibleMode.MySQL);
         String jdbcUrl = "jdbc:oceanbase://localhost:2883/foo";
 
         assertThat(dialect.appendDefaultUrlProperties(jdbcUrl))
@@ -44,7 +44,7 @@ class OceanBaseDialectTest {
 
     @Test
     void testOracleAppendDefaultUrlProperties() {
-        OceanBaseDialect dialect = new OceanBaseDialect("oracle");
+        OceanBaseDialect dialect = new 
OceanBaseDialect(OceanBaseCompatibleMode.Oracle);
         String jdbcUrl = "jdbc:oceanbase://localhost:2883/foo";
 
         
assertThat(dialect.appendDefaultUrlProperties(jdbcUrl)).isEqualTo(jdbcUrl);
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseOracleDialectTest.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseOracleDialectTest.java
index a8f3d579..eb87b132 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseOracleDialectTest.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/dialect/OceanBaseOracleDialectTest.java
@@ -20,6 +20,8 @@ package 
org.apache.flink.connector.jdbc.oceanbase.database.dialect;
 
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest;
 import org.apache.flink.connector.jdbc.oceanbase.OceanBaseOracleTestBase;
+import org.apache.flink.connector.jdbc.oceanbase.testutils.OceanBaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
 
 import java.util.Arrays;
 import java.util.List;
@@ -38,6 +40,20 @@ class OceanBaseOracleDialectTest extends JdbcDialectTest 
implements OceanBaseOra
                         + ")";
     }
 
+    @Override
+    public DatabaseMetadata getMetadata() {
+        OceanBaseMetadata metadata =
+                (OceanBaseMetadata) 
OceanBaseOracleTestBase.super.getMetadata();
+        return metadata.getJdbcUrl() != null
+                ? metadata
+                : new OceanBaseMetadata(
+                        "user",
+                        "password",
+                        "jdbc:oceanbase://host:port/schema",
+                        "com.oceanbase.jdbc.Driver",
+                        "test");
+    }
+
     @Override
     protected List<TestItem> testData() {
         return Arrays.asList(
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
index ff6522ad..c55319ef 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.DataTypes;
 
 import java.util.Map;
 
+import static 
org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase.tableRow;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
index f32d3150..5c1972a5 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
@@ -30,6 +30,7 @@ import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase.tableRow;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
index 03b91ae8..ac1e3ba5 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
@@ -31,12 +31,13 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.connector.jdbc.oceanbase.OceanBaseOracleTestBase.tableRow;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
 
 /** The Table Sink ITCase for OceanBase Oracle mode. */
-@Disabled
+@Disabled("OceanBase Oracle mode can only be tested locally.")
 class OceanBaseOracleDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase
         implements OceanBaseOracleTestBase {
 
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
index b4ad8a56..5e3a5ea2 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
@@ -32,11 +32,12 @@ import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.flink.connector.jdbc.oceanbase.OceanBaseOracleTestBase.tableRow;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
 import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
 
 /** The Table Source ITCase for OceanBase Oracle mode. */
-@Disabled
+@Disabled("OceanBase Oracle mode can only be tested locally.")
 class OceanBaseOracleDynamicTableSourceITCase extends 
JdbcDynamicTableSourceITCase
         implements OceanBaseOracleTestBase {
 
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseTableRow.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseTableRow.java
index acffc0d0..e6c7cd2f 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseTableRow.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseTableRow.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.jdbc.oceanbase.table;
 
+import 
org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.testutils.tables.TableField;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
@@ -28,9 +29,10 @@ import java.util.List;
 /** TableRow for OceanBase. */
 public class OceanBaseTableRow extends TableRow {
 
-    private final String compatibleMode;
+    private final OceanBaseCompatibleMode compatibleMode;
 
-    public OceanBaseTableRow(String compatibleMode, String name, TableField[] 
fields) {
+    public OceanBaseTableRow(
+            OceanBaseCompatibleMode compatibleMode, String name, TableField[] 
fields) {
         super(name, fields);
         this.compatibleMode = compatibleMode;
     }
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseContainer.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseContainer.java
new file mode 100644
index 00000000..a31f469a
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseContainer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.testutils;
+
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * JdbcDatabaseContainer for latest Docker images, can be removed after 
testcontainers 1.20.1 is
+ * released.
+ */
+public class OceanBaseContainer extends 
org.testcontainers.oceanbase.OceanBaseCEContainer {
+
+    static final String DOCKER_IMAGE_NAME = "oceanbase/oceanbase-ce";
+
+    private static final DockerImageName DEFAULT_IMAGE_NAME =
+            DockerImageName.parse(DOCKER_IMAGE_NAME);
+
+    private static final Integer SQL_PORT = 2881;
+
+    private static final Integer RPC_PORT = 2882;
+
+    private static final String DEFAULT_PASSWORD = "";
+
+    private String password = DEFAULT_PASSWORD;
+
+    public OceanBaseContainer(String dockerImageName) {
+        this(DockerImageName.parse(dockerImageName));
+    }
+
+    public OceanBaseContainer(DockerImageName dockerImageName) {
+        super(dockerImageName);
+        dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+
+        addExposedPorts(SQL_PORT, RPC_PORT);
+        setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1));
+    }
+
+    @Override
+    protected void configure() {
+        addEnv("MODE", "slim");
+        addEnv("OB_TENANT_PASSWORD", password);
+    }
+
+    @Override
+    protected void waitUntilContainerStarted() {
+        getWaitStrategy().waitUntilReady(this);
+    }
+
+    @Override
+    public String getPassword() {
+        return password;
+    }
+
+    public OceanBaseContainer withPassword(String password) {
+        this.password = password;
+        return this;
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
index c07ad5c2..726cd41e 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
@@ -27,23 +27,36 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
 import org.testcontainers.oceanbase.OceanBaseCEContainer;
 
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 
 /** OceanBase database for testing. */
 public class OceanBaseDatabase extends DatabaseExtension implements 
OceanBaseImages {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseDatabase.class);
 
+    private static final String ZONE_OFFSET =
+            DateTimeFormatter.ofPattern("xxx")
+                    
.format(ZoneId.systemDefault().getRules().getOffset(Instant.now()));
+
     private static final OceanBaseCEContainer CONTAINER =
-            new OceanBaseCEContainer(OCEANBASE_CE_4)
-                    .withEnv("MODE", "slim")
-                    .withEnv("FASTBOOT", "true")
-                    .withEnv("OB_DATAFILE_SIZE", "1G")
-                    .withEnv("OB_LOG_DISK_SIZE", "4G")
+            new OceanBaseContainer(OCEANBASE_CE_4)
+                    .withPassword("123456")
+                    .withUrlParam("useSSL", "false")
+                    .withUrlParam("serverTimezone", ZONE_OFFSET)
+                    .withCopyToContainer(
+                            Transferable.of(
+                                    String.format("SET GLOBAL time_zone = 
'%s';", ZONE_OFFSET)),
+                            "/root/boot/init.d/init.sql")
+                    .waitingFor(
+                            Wait.forLogMessage(".*boot success!.*", 1)
+                                    .withStartupTimeout(Duration.ofMinutes(2)))
                     .withLogConsumer(new Slf4jLogConsumer(LOG));
 
     private static OceanBaseMetadata metadata;
@@ -65,17 +78,6 @@ public class OceanBaseDatabase extends DatabaseExtension 
implements OceanBaseIma
 
     @Override
     protected DatabaseResource getResource() {
-        return new DockerResource(CONTAINER) {
-            @Override
-            public void start() {
-                super.start();
-                try (Connection connection = getMetadata().getConnection();
-                        Statement statement = connection.createStatement()) {
-                    statement.execute("SET GLOBAL time_zone = '+00:00'");
-                } catch (SQLException e) {
-                    throw new FlinkRuntimeException(e);
-                }
-            }
-        };
+        return new DockerResource(CONTAINER);
     }
 }
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseImages.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseImages.java
index e167bad5..f62e9ac8 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseImages.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseImages.java
@@ -20,5 +20,5 @@ package org.apache.flink.connector.jdbc.oceanbase.testutils;
 
 /** OceanBase docker images. */
 public interface OceanBaseImages {
-    String OCEANBASE_CE_4 = "oceanbase/oceanbase-ce:4.2.2";
+    String OCEANBASE_CE_4 = "oceanbase/oceanbase-ce:4.2.1-lts";
 }
diff --git 
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
index 537332be..def48d64 100644
--- 
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
+++ 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java
@@ -187,7 +187,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
             throw new DatabaseNotExistException(getName(), databaseName);
         }
 
-        final String url = baseUrl + databaseName;
+        final String url = getDatabaseUrl(databaseName);
         try (Connection conn = DriverManager.getConnection(url, 
connectionProperties)) {
             // get all schemas
             List<String> schemas;

Reply via email to