This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3130ae089e [Fix][Connector-V2][OceanBase] Remove OceanBase catalog's 
dependency on mysql driver (#7311)
3130ae089e is described below

commit 3130ae089e65bc71fa61784fad7d282f92aeadaf
Author: xxsc0529 <[email protected]>
AuthorDate: Fri Aug 9 20:02:06 2024 +0800

    [Fix][Connector-V2][OceanBase] Remove OceanBase catalog's dependency on 
mysql driver (#7311)
---
 .../catalog/oceanbase/OceanBaseCatalogFactory.java |   5 +
 .../catalog/oceanbase/OceanBaseMySqlCatalog.java   | 193 ++++++-
 .../OceanBaseMysqlCreateTableSqlBuilder.java       | 271 ++++++++++
 .../dialect/oceanbase/OceanBaseDialectFactory.java |   3 +-
 .../oceanbase/OceanBaseMySqlTypeConverter.java     | 513 +++++++++++++++++++
 .../oceanbase/OceanBaseMySqlTypeMapper.java        |  72 +++
 .../dialect/oceanbase/OceanBaseMysqlDialect.java   | 290 +++++++++++
 ...ry.java => OceanBaseMysqlJdbcRowConverter.java} |  37 +-
 .../dialect/oceanbase/OceanBaseMysqlType.java      | 567 +++++++++++++++++++++
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     |   2 +
 .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java       |   8 +-
 11 files changed, 1927 insertions(+), 34 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
index 58dfa5b884..01d035e167 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
@@ -31,6 +31,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
 
 import org.apache.commons.lang3.StringUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.auto.service.AutoService;
 
 import java.util.Optional;
@@ -38,6 +41,8 @@ import java.util.Optional;
 @AutoService(Factory.class)
 public class OceanBaseCatalogFactory implements CatalogFactory {
 
+    private static final Logger log = 
LoggerFactory.getLogger(OceanBaseCatalogFactory.class);
+
     @Override
     public String factoryIdentifier() {
         return DatabaseIdentifier.OCENABASE;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index 58cdb5c413..08aa0faea0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -17,10 +17,44 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;
 
-public class OceanBaseMySqlCatalog extends MySqlCatalog {
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
+@Slf4j
+public class OceanBaseMySqlCatalog extends AbstractJdbcCatalog {
+
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+            "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 
'%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";
+
+    private static final String SELECT_DATABASE_EXISTS =
+            "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE 
SCHEMA_NAME = '%s'";
+
+    private static final String SELECT_TABLE_EXISTS =
+            "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables 
WHERE table_schema = '%s' AND table_name = '%s'";
 
     static {
         SYS_DATABASES.clear();
@@ -32,8 +66,161 @@ public class OceanBaseMySqlCatalog extends MySqlCatalog {
         SYS_DATABASES.add("SYS");
     }
 
+    private OceanBaseMySqlTypeConverter typeConverter;
+
     public OceanBaseMySqlCatalog(
             String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
-        super(catalogName, username, pwd, urlInfo);
+        super(catalogName, username, pwd, urlInfo, null);
+        this.typeConverter = new OceanBaseMySqlTypeConverter();
+    }
+
+    @Override
+    protected String getDatabaseWithConditionSql(String databaseName) {
+        return String.format(SELECT_DATABASE_EXISTS, databaseName);
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected String getListDatabaseSql() {
+        return "SHOW DATABASES;";
+    }
+
+    @Override
+    protected String getListTableSql(String databaseName) {
+        return "SHOW TABLES;";
+    }
+
+    @Override
+    protected String getTableName(ResultSet rs) throws SQLException {
+        return rs.getString(1);
+    }
+
+    @Override
+    protected String getTableName(TablePath tablePath) {
+        return tablePath.getTableName();
+    }
+
+    @Override
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+        return TableIdentifier.of(
+                catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        List<ConstraintKey> indexList =
+                super.getConstraintKeys(
+                        metaData,
+                        tablePath.getDatabaseName(),
+                        tablePath.getSchemaName(),
+                        tablePath.getTableName());
+        for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext(); 
) {
+            ConstraintKey index = it.next();
+            if 
(ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType())
+                    && "PRIMARY".equals(index.getConstraintName())) {
+                it.remove();
+            }
+        }
+        return indexList;
+    }
+
+    @Override
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
+        String columnName = resultSet.getString("COLUMN_NAME");
+        // e.g. tinyint(1) unsigned
+        String columnType = resultSet.getString("COLUMN_TYPE");
+        // e.g. tinyint
+        String dataType = resultSet.getString("DATA_TYPE").toUpperCase();
+        String comment = resultSet.getString("COLUMN_COMMENT");
+        Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
+        String isNullableStr = resultSet.getString("IS_NULLABLE");
+        boolean isNullable = isNullableStr.equals("YES");
+        // e.g. `decimal(10, 2)` is 10
+        long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");
+        // e.g. `decimal(10, 2)` is 2
+        int numberScale = resultSet.getInt("NUMERIC_SCALE");
+        // e.g. `varchar(10)` is 40
+        long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
+        // e.g. `timestamp(3)` is 3
+        //        int timePrecision =
+        //                MySqlVersion.V_5_5.equals(version) ? 0 :
+        // resultSet.getInt("DATETIME_PRECISION");
+        int timePrecision = resultSet.getInt("DATETIME_PRECISION");
+        Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 
0));
+        Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));
+
+        OceanBaseMysqlType oceanbaseMysqlType = 
OceanBaseMysqlType.getByName(columnType);
+        boolean unsigned = 
columnType.toLowerCase(Locale.ROOT).contains("unsigned");
+
+        BasicTypeDefine<OceanBaseMysqlType> typeDefine =
+                BasicTypeDefine.<OceanBaseMysqlType>builder()
+                        .name(columnName)
+                        .columnType(columnType)
+                        .dataType(dataType)
+                        .nativeType(oceanbaseMysqlType)
+                        .unsigned(unsigned)
+                        .length(Math.max(charOctetLength, numberPrecision))
+                        .precision(numberPrecision)
+                        .scale(Math.max(numberScale, timePrecision))
+                        .nullable(isNullable)
+                        .defaultValue(defaultValue)
+                        .comment(comment)
+                        .build();
+        return typeConverter.convert(typeDefine);
+    }
+
+    @Override
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        return OceanBaseMysqlCreateTableSqlBuilder.builder(tablePath, table, 
typeConverter)
+                .build(table.getCatalogName());
+    }
+
+    @Override
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format(
+                "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected String getCreateDatabaseSql(String databaseName) {
+        return String.format("CREATE DATABASE `%s`;", databaseName);
+    }
+
+    @Override
+    protected String getDropDatabaseSql(String databaseName) {
+        return String.format("DROP DATABASE `%s`;", databaseName);
+    }
+
+    @Override
+    public CatalogTable getTable(String sqlQuery) throws SQLException {
+        Connection defaultConnection = getConnection(defaultUrl);
+        Statement statement = defaultConnection.createStatement();
+        ResultSetMetaData metaData = 
statement.executeQuery(sqlQuery).getMetaData();
+        return CatalogUtils.getCatalogTable(
+                metaData, new OceanBaseMySqlTypeMapper(typeConverter), 
sqlQuery);
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) throws 
CatalogException {
+        return String.format(
+                "TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    public String getExistDataSql(TablePath tablePath) {
+        return String.format(
+                "SELECT * FROM `%s`.`%s` LIMIT 1;",
+                tablePath.getDatabaseName(), tablePath.getTableName());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..bc3413dbd8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.SqlType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class OceanBaseMysqlCreateTableSqlBuilder {
+
+    private final String tableName;
+    private List<Column> columns;
+
+    private String comment;
+
+    private String engine;
+    private String charset;
+    private String collate;
+
+    private PrimaryKey primaryKey;
+
+    private List<ConstraintKey> constraintKeys;
+
+    private String fieldIde;
+
+    private final OceanBaseMySqlTypeConverter typeConverter;
+
+    private OceanBaseMysqlCreateTableSqlBuilder(
+            String tableName, OceanBaseMySqlTypeConverter typeConverter) {
+        checkNotNull(tableName, "tableName must not be null");
+        this.tableName = tableName;
+        this.typeConverter = typeConverter;
+    }
+
+    public static OceanBaseMysqlCreateTableSqlBuilder builder(
+            TablePath tablePath,
+            CatalogTable catalogTable,
+            OceanBaseMySqlTypeConverter typeConverter) {
+        checkNotNull(tablePath, "tablePath must not be null");
+        checkNotNull(catalogTable, "catalogTable must not be null");
+
+        TableSchema tableSchema = catalogTable.getTableSchema();
+        checkNotNull(tableSchema, "tableSchema must not be null");
+
+        return new 
OceanBaseMysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter)
+                .comment(catalogTable.getComment())
+                // todo: set charset and collate
+                .engine(null)
+                .charset(null)
+                .primaryKey(tableSchema.getPrimaryKey())
+                .constraintKeys(tableSchema.getConstraintKeys())
+                .addColumn(tableSchema.getColumns())
+                .fieldIde(catalogTable.getOptions().get("fieldIde"));
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder addColumn(List<Column> columns) 
{
+        checkArgument(CollectionUtils.isNotEmpty(columns), "columns must not 
be empty");
+        this.columns = columns;
+        return this;
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder primaryKey(PrimaryKey 
primaryKey) {
+        this.primaryKey = primaryKey;
+        return this;
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder fieldIde(String fieldIde) {
+        this.fieldIde = fieldIde;
+        return this;
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder 
constraintKeys(List<ConstraintKey> constraintKeys) {
+        this.constraintKeys = constraintKeys;
+        return this;
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder engine(String engine) {
+        this.engine = engine;
+        return this;
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder charset(String charset) {
+        this.charset = charset;
+        return this;
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder collate(String collate) {
+        this.collate = collate;
+        return this;
+    }
+
+    public OceanBaseMysqlCreateTableSqlBuilder comment(String comment) {
+        this.comment = comment;
+        return this;
+    }
+
+    public String build(String catalogName) {
+        List<String> sqls = new ArrayList<>();
+        sqls.add(
+                String.format(
+                        "CREATE TABLE %s (\n%s\n)",
+                        CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"),
+                        buildColumnsIdentifySql(catalogName)));
+        if (engine != null) {
+            sqls.add("ENGINE = " + engine);
+        }
+        if (charset != null) {
+            sqls.add("DEFAULT CHARSET = " + charset);
+        }
+        if (collate != null) {
+            sqls.add("COLLATE = " + collate);
+        }
+        if (comment != null) {
+            sqls.add("COMMENT = '" + comment + "'");
+        }
+        return String.join(" ", sqls) + ";";
+    }
+
+    private String buildColumnsIdentifySql(String catalogName) {
+        List<String> columnSqls = new ArrayList<>();
+        Map<String, String> columnTypeMap = new HashMap<>();
+        for (Column column : columns) {
+            columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, 
columnTypeMap));
+        }
+        if (primaryKey != null) {
+            columnSqls.add("\t" + buildPrimaryKeySql());
+        }
+        if (CollectionUtils.isNotEmpty(constraintKeys)) {
+            for (ConstraintKey constraintKey : constraintKeys) {
+                if (StringUtils.isBlank(constraintKey.getConstraintName())) {
+                    continue;
+                }
+                String constraintKeyStr = buildConstraintKeySql(constraintKey, 
columnTypeMap);
+                if (StringUtils.isNotBlank(constraintKeyStr)) {
+                    columnSqls.add("\t" + constraintKeyStr);
+                }
+            }
+        }
+        return String.join(", \n", columnSqls);
+    }
+
+    private String buildColumnIdentifySql(
+            Column column, String catalogName, Map<String, String> 
columnTypeMap) {
+        final List<String> columnSqls = new ArrayList<>();
+        columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "`"));
+        String type;
+        if ((SqlType.TIME.equals(column.getDataType().getSqlType())
+                        || 
SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
+                && column.getScale() != null) {
+            BasicTypeDefine<OceanBaseMysqlType> typeDefine = 
typeConverter.reconvert(column);
+            type = typeDefine.getColumnType();
+        } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)
+                && StringUtils.isNotBlank(column.getSourceType())) {
+            type = column.getSourceType();
+        } else {
+            BasicTypeDefine<OceanBaseMysqlType> typeDefine = 
typeConverter.reconvert(column);
+            type = typeDefine.getColumnType();
+        }
+        columnSqls.add(type);
+        columnTypeMap.put(column.getName(), type);
+        // nullable
+        if (column.isNullable()) {
+            columnSqls.add("NULL");
+        } else {
+            columnSqls.add("NOT NULL");
+        }
+
+        if (column.getComment() != null) {
+            columnSqls.add(
+                    "COMMENT '"
+                            + column.getComment().replace("'", 
"''").replace("\\", "\\\\")
+                            + "'");
+        }
+
+        return String.join(" ", columnSqls);
+    }
+
+    private String buildPrimaryKeySql() {
+        String key =
+                primaryKey.getColumnNames().stream()
+                        .map(columnName -> "`" + columnName + "`")
+                        .collect(Collectors.joining(", "));
+        // add sort type
+        return String.format("PRIMARY KEY (%s)", 
CatalogUtils.quoteIdentifier(key, fieldIde));
+    }
+
+    private String buildConstraintKeySql(
+            ConstraintKey constraintKey, Map<String, String> columnTypeMap) {
+        ConstraintKey.ConstraintType constraintType = 
constraintKey.getConstraintType();
+        String indexColumns =
+                constraintKey.getColumnNames().stream()
+                        .map(
+                                constraintKeyColumn -> {
+                                    String columnName = 
constraintKeyColumn.getColumnName();
+                                    boolean withLength = false;
+                                    if (columnTypeMap.containsKey(columnName)) 
{
+                                        String columnType = 
columnTypeMap.get(columnName);
+                                        if (columnType.endsWith("BLOB")
+                                                || 
columnType.endsWith("TEXT")) {
+                                            withLength = true;
+                                        }
+                                    }
+                                    if (constraintKeyColumn.getSortType() == 
null) {
+                                        return String.format(
+                                                "`%s`%s",
+                                                
CatalogUtils.getFieldIde(columnName, fieldIde),
+                                                withLength ? "(255)" : "");
+                                    }
+                                    return String.format(
+                                            "`%s`%s %s",
+                                            
CatalogUtils.getFieldIde(columnName, fieldIde),
+                                            withLength ? "(255)" : "",
+                                            
constraintKeyColumn.getSortType().name());
+                                })
+                        .collect(Collectors.joining(", "));
+        String keyName = null;
+        switch (constraintType) {
+            case INDEX_KEY:
+                keyName = "KEY";
+                break;
+            case UNIQUE_KEY:
+                keyName = "UNIQUE KEY";
+                break;
+            case FOREIGN_KEY:
+                keyName = "FOREIGN KEY";
+                // todo:
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported constraint type: " + constraintType);
+        }
+        return String.format(
+                "%s `%s` (%s)", keyName, constraintKey.getConstraintName(), 
indexColumns);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
index b3a456870c..d25d48b4f2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbas
 
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
 
 import com.google.auto.service.AutoService;
@@ -44,6 +43,6 @@ public class OceanBaseDialectFactory implements 
JdbcDialectFactory {
         if ("oracle".equalsIgnoreCase(compatibleMode)) {
             return new OracleDialect();
         }
-        return new MysqlDialect();
+        return new OceanBaseMysqlDialect();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
new file mode 100644
index 0000000000..4e9fa04d0d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class OceanBaseMySqlTypeConverter
+        implements TypeConverter<BasicTypeDefine<OceanBaseMysqlType>> {
+
+    // ============================data types=====================
+    static final String MYSQL_NULL = "NULL";
+    static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    static final String MYSQL_TINYINT = "TINYINT";
+    static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    static final String MYSQL_SMALLINT = "SMALLINT";
+    static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    static final String MYSQL_INT = "INT";
+    static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    static final String MYSQL_INTEGER = "INTEGER";
+    static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    static final String MYSQL_BIGINT = "BIGINT";
+    static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    static final String MYSQL_DECIMAL = "DECIMAL";
+    static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    static final String MYSQL_FLOAT = "FLOAT";
+    static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    static final String MYSQL_DOUBLE = "DOUBLE";
+    static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    static final String MYSQL_TINYTEXT = "TINYTEXT";
+    static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    static final String MYSQL_TEXT = "TEXT";
+    static final String MYSQL_LONGTEXT = "LONGTEXT";
+    static final String MYSQL_JSON = "JSON";
+    static final String MYSQL_ENUM = "ENUM";
+
+    // ------------------------------time-------------------------
+    static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    static final String MYSQL_TINYBLOB = "TINYBLOB";
+    static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    static final String MYSQL_BLOB = "BLOB";
+    static final String MYSQL_LONGBLOB = "LONGBLOB";
+    static final String MYSQL_BINARY = "BINARY";
+    static final String MYSQL_VARBINARY = "VARBINARY";
+    static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    public static final int DEFAULT_PRECISION = 38;
+    public static final int MAX_PRECISION = 65;
+    public static final int DEFAULT_SCALE = 18;
+    public static final int MAX_SCALE = 30;
+    public static final int MAX_TIME_SCALE = 6;
+    public static final int MAX_TIMESTAMP_SCALE = 6;
+    public static final long POWER_2_8 = (long) Math.pow(2, 8);
+    public static final long POWER_2_16 = (long) Math.pow(2, 16);
+    public static final long POWER_2_24 = (long) Math.pow(2, 24);
+    public static final long POWER_2_32 = (long) Math.pow(2, 32);
+    public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
+
+    @Override
+    public String identifier() {
+        return DatabaseIdentifier.OCENABASE;
+    }
+
+    @Override
+    public Column convert(BasicTypeDefine typeDefine) {
+        PhysicalColumn.PhysicalColumnBuilder builder =
+                PhysicalColumn.builder()
+                        .name(typeDefine.getName())
+                        .sourceType(typeDefine.getColumnType())
+                        .nullable(typeDefine.isNullable())
+                        .defaultValue(typeDefine.getDefaultValue())
+                        .comment(typeDefine.getComment());
+
+        String mysqlDataType = typeDefine.getDataType().toUpperCase();
+        if (typeDefine.isUnsigned() && !(mysqlDataType.endsWith(" UNSIGNED"))) 
{
+            mysqlDataType = mysqlDataType + " UNSIGNED";
+        }
+        switch (mysqlDataType) {
+            case MYSQL_NULL:
+                builder.dataType(BasicType.VOID_TYPE);
+                break;
+            case MYSQL_BIT:
+                if (typeDefine.getLength() == null || typeDefine.getLength() 
<= 0) {
+                    builder.dataType(BasicType.BOOLEAN_TYPE);
+                } else if (typeDefine.getLength() == 1) {
+                    builder.dataType(BasicType.BOOLEAN_TYPE);
+                } else {
+                    builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                    // BIT(M) -> BYTE(M/8)
+                    long byteLength = typeDefine.getLength() / 8;
+                    byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;
+                    builder.columnLength(byteLength);
+                }
+                break;
+            case MYSQL_TINYINT:
+                if (typeDefine.getColumnType().equalsIgnoreCase("tinyint(1)")) 
{
+                    builder.dataType(BasicType.BOOLEAN_TYPE);
+                } else {
+                    builder.dataType(BasicType.BYTE_TYPE);
+                }
+                break;
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+                builder.dataType(BasicType.SHORT_TYPE);
+                break;
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+            case MYSQL_YEAR:
+                builder.dataType(BasicType.INT_TYPE);
+                break;
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                builder.dataType(BasicType.LONG_TYPE);
+                break;
+            case MYSQL_BIGINT_UNSIGNED:
+                DecimalType intDecimalType = new DecimalType(20, 0);
+                builder.dataType(intDecimalType);
+                
builder.columnLength(Long.valueOf(intDecimalType.getPrecision()));
+                builder.scale(intDecimalType.getScale());
+                break;
+            case MYSQL_FLOAT:
+                builder.dataType(BasicType.FLOAT_TYPE);
+                break;
+            case MYSQL_FLOAT_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", 
MYSQL_FLOAT_UNSIGNED);
+                builder.dataType(BasicType.FLOAT_TYPE);
+                break;
+            case MYSQL_DOUBLE:
+                builder.dataType(BasicType.DOUBLE_TYPE);
+                break;
+            case MYSQL_DOUBLE_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", 
MYSQL_DOUBLE_UNSIGNED);
+                builder.dataType(BasicType.DOUBLE_TYPE);
+                break;
+            case MYSQL_DECIMAL:
+                Preconditions.checkArgument(typeDefine.getPrecision() > 0);
+
+                DecimalType decimalType;
+                if (typeDefine.getPrecision() > DEFAULT_PRECISION) {
+                    log.warn("{} will probably cause value overflow.", 
MYSQL_DECIMAL);
+                    decimalType = new DecimalType(DEFAULT_PRECISION, 
DEFAULT_SCALE);
+                } else {
+                    decimalType =
+                            new DecimalType(
+                                    typeDefine.getPrecision().intValue(),
+                                    typeDefine.getScale() == null
+                                            ? 0
+                                            : 
typeDefine.getScale().intValue());
+                }
+                builder.dataType(decimalType);
+                builder.columnLength(Long.valueOf(decimalType.getPrecision()));
+                builder.scale(decimalType.getScale());
+                break;
+            case MYSQL_DECIMAL_UNSIGNED:
+                Preconditions.checkArgument(typeDefine.getPrecision() > 0);
+
+                log.warn("{} will probably cause value overflow.", 
MYSQL_DECIMAL_UNSIGNED);
+                DecimalType decimalUnsignedType =
+                        new DecimalType(
+                                typeDefine.getPrecision().intValue() + 1,
+                                typeDefine.getScale() == null
+                                        ? 0
+                                        : typeDefine.getScale().intValue());
+                builder.dataType(decimalUnsignedType);
+                
builder.columnLength(Long.valueOf(decimalUnsignedType.getPrecision()));
+                builder.scale(decimalUnsignedType.getScale());
+                break;
+            case MYSQL_ENUM:
+                builder.dataType(BasicType.STRING_TYPE);
+                if (typeDefine.getLength() == null || typeDefine.getLength() 
<= 0) {
+                    builder.columnLength(100L);
+                } else {
+                    builder.columnLength(typeDefine.getLength());
+                }
+                break;
+            case MYSQL_CHAR:
+            case MYSQL_VARCHAR:
+                if (typeDefine.getLength() == null || typeDefine.getLength() 
<= 0) {
+                    
builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L));
+                } else {
+                    builder.columnLength(typeDefine.getLength());
+                }
+                builder.dataType(BasicType.STRING_TYPE);
+                break;
+            case MYSQL_TINYTEXT:
+                builder.dataType(BasicType.STRING_TYPE);
+                builder.columnLength(POWER_2_8 - 1);
+                break;
+            case MYSQL_TEXT:
+                builder.dataType(BasicType.STRING_TYPE);
+                builder.columnLength(POWER_2_16 - 1);
+                break;
+            case MYSQL_MEDIUMTEXT:
+                builder.dataType(BasicType.STRING_TYPE);
+                builder.columnLength(POWER_2_24 - 1);
+                break;
+            case MYSQL_LONGTEXT:
+                builder.dataType(BasicType.STRING_TYPE);
+                builder.columnLength(POWER_2_32 - 1);
+                break;
+            case MYSQL_JSON:
+                builder.dataType(BasicType.STRING_TYPE);
+                break;
+            case MYSQL_BINARY:
+            case MYSQL_VARBINARY:
+                if (typeDefine.getLength() == null || typeDefine.getLength() 
<= 0) {
+                    builder.columnLength(1L);
+                } else {
+                    builder.columnLength(typeDefine.getLength());
+                }
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                break;
+            case MYSQL_TINYBLOB:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                builder.columnLength(POWER_2_8 - 1);
+                break;
+            case MYSQL_BLOB:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                builder.columnLength(POWER_2_16 - 1);
+                break;
+            case MYSQL_MEDIUMBLOB:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                builder.columnLength(POWER_2_24 - 1);
+                break;
+            case MYSQL_LONGBLOB:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                builder.columnLength(POWER_2_32 - 1);
+                break;
+            case MYSQL_GEOMETRY:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                break;
+            case MYSQL_DATE:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
+                break;
+            case MYSQL_TIME:
+                builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
+                builder.scale(typeDefine.getScale());
+                break;
+            case MYSQL_DATETIME:
+            case MYSQL_TIMESTAMP:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                builder.scale(typeDefine.getScale());
+                break;
+            default:
+                throw CommonError.convertToSeaTunnelTypeError(
+                        DatabaseIdentifier.OCENABASE, mysqlDataType, 
typeDefine.getName());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public BasicTypeDefine<OceanBaseMysqlType> reconvert(Column column) {
+        BasicTypeDefine.BasicTypeDefineBuilder builder =
+                BasicTypeDefine.<OceanBaseMysqlType>builder()
+                        .name(column.getName())
+                        .nullable(column.isNullable())
+                        .comment(column.getComment())
+                        .defaultValue(column.getDefaultValue());
+        switch (column.getDataType().getSqlType()) {
+            case NULL:
+                builder.nativeType(OceanBaseMysqlType.NULL);
+                builder.columnType(MYSQL_NULL);
+                builder.dataType(MYSQL_NULL);
+                break;
+            case BOOLEAN:
+                builder.nativeType(OceanBaseMysqlType.BOOLEAN);
+                builder.columnType(String.format("%s(%s)", MYSQL_TINYINT, 1));
+                builder.dataType(MYSQL_TINYINT);
+                builder.length(1L);
+                break;
+            case TINYINT:
+                builder.nativeType(OceanBaseMysqlType.TINYINT);
+                builder.columnType(MYSQL_TINYINT);
+                builder.dataType(MYSQL_TINYINT);
+                break;
+            case SMALLINT:
+                builder.nativeType(OceanBaseMysqlType.SMALLINT);
+                builder.columnType(MYSQL_SMALLINT);
+                builder.dataType(MYSQL_SMALLINT);
+                break;
+            case INT:
+                builder.nativeType(OceanBaseMysqlType.INT);
+                builder.columnType(MYSQL_INT);
+                builder.dataType(MYSQL_INT);
+                break;
+            case BIGINT:
+                builder.nativeType(OceanBaseMysqlType.BIGINT);
+                builder.columnType(MYSQL_BIGINT);
+                builder.dataType(MYSQL_BIGINT);
+                break;
+            case FLOAT:
+                builder.nativeType(OceanBaseMysqlType.FLOAT);
+                builder.columnType(MYSQL_FLOAT);
+                builder.dataType(MYSQL_FLOAT);
+                break;
+            case DOUBLE:
+                builder.nativeType(OceanBaseMysqlType.DOUBLE);
+                builder.columnType(MYSQL_DOUBLE);
+                builder.dataType(MYSQL_DOUBLE);
+                break;
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) column.getDataType();
+                long precision = decimalType.getPrecision();
+                int scale = decimalType.getScale();
+                if (precision <= 0) {
+                    precision = DEFAULT_PRECISION;
+                    scale = DEFAULT_SCALE;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which is precision less than 0, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            precision,
+                            scale);
+                } else if (precision > MAX_PRECISION) {
+                    scale = (int) Math.max(0, scale - (precision - 
MAX_PRECISION));
+                    precision = MAX_PRECISION;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which exceeds the maximum precision of 
{}, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            MAX_PRECISION,
+                            precision,
+                            scale);
+                }
+                if (scale < 0) {
+                    scale = 0;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which is scale less than 0, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            precision,
+                            scale);
+                } else if (scale > MAX_SCALE) {
+                    scale = MAX_SCALE;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which exceeds the maximum scale of {}, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            MAX_SCALE,
+                            precision,
+                            scale);
+                }
+
+                builder.nativeType(OceanBaseMysqlType.DECIMAL);
+                builder.columnType(String.format("%s(%s,%s)", MYSQL_DECIMAL, 
precision, scale));
+                builder.dataType(MYSQL_DECIMAL);
+                builder.precision(precision);
+                builder.scale(scale);
+                break;
+            case BYTES:
+                if (column.getColumnLength() == null || 
column.getColumnLength() <= 0) {
+                    builder.nativeType(OceanBaseMysqlType.VARBINARY);
+                    builder.columnType(
+                            String.format("%s(%s)", MYSQL_VARBINARY, 
MAX_VARBINARY_LENGTH / 2));
+                    builder.dataType(MYSQL_VARBINARY);
+                } else if (column.getColumnLength() < MAX_VARBINARY_LENGTH) {
+                    builder.nativeType(OceanBaseMysqlType.VARBINARY);
+                    builder.columnType(
+                            String.format("%s(%s)", MYSQL_VARBINARY, 
column.getColumnLength()));
+                    builder.dataType(MYSQL_VARBINARY);
+                } else if (column.getColumnLength() < POWER_2_24) {
+                    builder.nativeType(OceanBaseMysqlType.MEDIUMBLOB);
+                    builder.columnType(MYSQL_MEDIUMBLOB);
+                    builder.dataType(MYSQL_MEDIUMBLOB);
+                } else {
+                    builder.nativeType(OceanBaseMysqlType.LONGBLOB);
+                    builder.columnType(MYSQL_LONGBLOB);
+                    builder.dataType(MYSQL_LONGBLOB);
+                }
+                break;
+            case STRING:
+                if (column.getColumnLength() == null || 
column.getColumnLength() <= 0) {
+                    builder.nativeType(OceanBaseMysqlType.LONGTEXT);
+                    builder.columnType(MYSQL_LONGTEXT);
+                    builder.dataType(MYSQL_LONGTEXT);
+                } else if (column.getColumnLength() < POWER_2_8) {
+                    builder.nativeType(OceanBaseMysqlType.VARCHAR);
+                    builder.columnType(
+                            String.format("%s(%s)", MYSQL_VARCHAR, 
column.getColumnLength()));
+                    builder.dataType(MYSQL_VARCHAR);
+                } else if (column.getColumnLength() < POWER_2_16) {
+                    builder.nativeType(OceanBaseMysqlType.TEXT);
+                    builder.columnType(MYSQL_TEXT);
+                    builder.dataType(MYSQL_TEXT);
+                } else if (column.getColumnLength() < POWER_2_24) {
+                    builder.nativeType(OceanBaseMysqlType.MEDIUMTEXT);
+                    builder.columnType(MYSQL_MEDIUMTEXT);
+                    builder.dataType(MYSQL_MEDIUMTEXT);
+                } else {
+                    builder.nativeType(OceanBaseMysqlType.LONGTEXT);
+                    builder.columnType(MYSQL_LONGTEXT);
+                    builder.dataType(MYSQL_LONGTEXT);
+                }
+                break;
+            case DATE:
+                builder.nativeType(OceanBaseMysqlType.DATE);
+                builder.columnType(MYSQL_DATE);
+                builder.dataType(MYSQL_DATE);
+                break;
+            case TIME:
+                builder.nativeType(OceanBaseMysqlType.TIME);
+                builder.dataType(MYSQL_TIME);
+                if (column.getScale() != null && column.getScale() > 0) {
+                    int timeScale = column.getScale();
+                    if (timeScale > MAX_TIME_SCALE) {
+                        timeScale = MAX_TIME_SCALE;
+                        log.warn(
+                                "The time column {} type time({}) is out of 
range, "
+                                        + "which exceeds the maximum scale of 
{}, "
+                                        + "it will be converted to time({})",
+                                column.getName(),
+                                column.getScale(),
+                                MAX_SCALE,
+                                timeScale);
+                    }
+                    builder.columnType(String.format("%s(%s)", MYSQL_TIME, 
timeScale));
+                    builder.scale(timeScale);
+                } else {
+                    builder.columnType(MYSQL_TIME);
+                }
+                break;
+            case TIMESTAMP:
+                builder.nativeType(OceanBaseMysqlType.DATETIME);
+                builder.dataType(MYSQL_DATETIME);
+                if (column.getScale() != null && column.getScale() > 0) {
+                    int timestampScale = column.getScale();
+                    if (timestampScale > MAX_TIMESTAMP_SCALE) {
+                        timestampScale = MAX_TIMESTAMP_SCALE;
+                        log.warn(
+                                "The timestamp column {} type timestamp({}) is 
out of range, "
+                                        + "which exceeds the maximum scale of 
{}, "
+                                        + "it will be converted to 
timestamp({})",
+                                column.getName(),
+                                column.getScale(),
+                                MAX_TIMESTAMP_SCALE,
+                                timestampScale);
+                    }
+                    builder.columnType(String.format("%s(%s)", MYSQL_DATETIME, 
timestampScale));
+                    builder.scale(timestampScale);
+                } else {
+                    builder.columnType(MYSQL_DATETIME);
+                }
+                break;
+            default:
+                throw CommonError.convertToConnectorTypeError(
+                        DatabaseIdentifier.OCENABASE,
+                        column.getDataType().getSqlType().name(),
+                        column.getName());
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java
new file mode 100644
index 0000000000..e4d6e8b973
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+public class OceanBaseMySqlTypeMapper implements JdbcDialectTypeMapper {
+
+    private OceanBaseMySqlTypeConverter typeConverter;
+
+    public OceanBaseMySqlTypeMapper() {
+        this.typeConverter = new OceanBaseMySqlTypeConverter();
+    }
+
+    public OceanBaseMySqlTypeMapper(OceanBaseMySqlTypeConverter typeConverter) 
{
+        this.typeConverter = typeConverter;
+    }
+
+    @Override
+    public Column mappingColumn(BasicTypeDefine typeDefine) {
+        return typeConverter.convert(typeDefine);
+    }
+
+    @Override
+    public Column mappingColumn(ResultSetMetaData metadata, int colIndex) 
throws SQLException {
+        String columnName = metadata.getColumnLabel(colIndex);
+        // e.g. tinyint unsigned
+        String nativeType = metadata.getColumnTypeName(colIndex);
+        int isNullable = metadata.isNullable(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        if (Arrays.asList("CHAR", "VARCHAR", "ENUM").contains(nativeType)) {
+            long octetLength = TypeDefineUtils.charTo4ByteLength((long) 
precision);
+            precision = (int) Math.max(precision, octetLength);
+        }
+
+        BasicTypeDefine typeDefine =
+                BasicTypeDefine.builder()
+                        .name(columnName)
+                        .columnType(nativeType)
+                        .dataType(nativeType)
+                        .nullable(isNullable == 
ResultSetMetaData.columnNullable)
+                        .length((long) precision)
+                        .precision((long) precision)
+                        .scale(scale)
+                        .build();
+        return mappingColumn(typeDefine);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
new file mode 100644
index 0000000000..83d3220b12
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class OceanBaseMysqlDialect implements JdbcDialect {
+
+    private static final List NOT_SUPPORTED_DEFAULT_VALUES =
+            Arrays.asList(
+                    OceanBaseMysqlType.BLOB,
+                    OceanBaseMysqlType.TEXT,
+                    OceanBaseMysqlType.JSON,
+                    OceanBaseMysqlType.GEOMETRY);
+
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public OceanBaseMysqlDialect() {}
+
+    public OceanBaseMysqlDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
+    @Override
+    public String dialectName() {
+        return DatabaseIdentifier.OCENABASE;
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new OceanBaseMysqlJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new OceanBaseMySqlTypeMapper();
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return "`" + getFieldIde(identifier, fieldIde) + "`";
+    }
+
+    @Override
+    public String quoteDatabaseIdentifier(String identifier) {
+        return "`" + identifier + "`";
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        String updateClause =
+                Arrays.stream(fieldNames)
+                        .map(
+                                fieldName ->
+                                        quoteIdentifier(fieldName)
+                                                + "=VALUES("
+                                                + quoteIdentifier(fieldName)
+                                                + ")")
+                        .collect(Collectors.joining(", "));
+        String upsertSQL =
+                getInsertIntoStatement(database, tableName, fieldNames)
+                        + " ON DUPLICATE KEY UPDATE "
+                        + updateClause;
+        return Optional.of(upsertSQL);
+    }
+
+    @Override
+    public PreparedStatement creatPreparedStatement(
+            Connection connection, String queryTemplate, int fetchSize) throws 
SQLException {
+        PreparedStatement statement =
+                connection.prepareStatement(
+                        queryTemplate, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+        statement.setFetchSize(Integer.MIN_VALUE);
+        return statement;
+    }
+
+    @Override
+    public String extractTableName(TablePath tablePath) {
+        return tablePath.getTableName();
+    }
+
+    @Override
+    public Map<String, String> defaultParameter() {
+        HashMap<String, String> map = new HashMap<>();
+        map.put("rewriteBatchedStatements", "true");
+        return map;
+    }
+
+    @Override
+    public TablePath parse(String tablePath) {
+        return TablePath.of(tablePath, false);
+    }
+
+    @Override
+    public Object[] sampleDataFromColumn(
+            Connection connection,
+            JdbcSourceTable table,
+            String columnName,
+            int samplingRate,
+            int fetchSize)
+            throws Exception {
+        String sampleQuery;
+        if (StringUtils.isNotBlank(table.getQuery())) {
+            sampleQuery =
+                    String.format(
+                            "SELECT %s FROM (%s) AS T",
+                            quoteIdentifier(columnName), table.getQuery());
+        } else {
+            sampleQuery =
+                    String.format(
+                            "SELECT %s FROM %s",
+                            quoteIdentifier(columnName), 
tableIdentifier(table.getTablePath()));
+        }
+
+        try (Statement stmt =
+                connection.createStatement(
+                        ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)) {
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
+                int count = 0;
+                List<Object> results = new ArrayList<>();
+
+                while (rs.next()) {
+                    count++;
+                    if (count % samplingRate == 0) {
+                        results.add(rs.getObject(1));
+                    }
+                    if (Thread.currentThread().isInterrupted()) {
+                        throw new InterruptedException("Thread interrupted");
+                    }
+                }
+                Object[] resultsArray = results.toArray();
+                Arrays.sort(resultsArray);
+                return resultsArray;
+            }
+        }
+    }
+
+    @Override
+    public Long approximateRowCntStatement(Connection connection, 
JdbcSourceTable table)
+            throws SQLException {
+
+        // 1. If no query is configured, use TABLE STATUS.
+        // 2. If a query is configured but does not contain a WHERE clause and 
tablePath is
+        // configured , use TABLE STATUS.
+        // 3. If a query is configured with a WHERE clause, or a query 
statement is configured but
+        // tablePath is TablePath.DEFAULT, use COUNT(*).
+
+        boolean useTableStats =
+                StringUtils.isBlank(table.getQuery())
+                        || (!table.getQuery().toLowerCase().contains("where")
+                                && table.getTablePath() != null
+                                && !TablePath.DEFAULT
+                                        .getFullName()
+                                        
.equals(table.getTablePath().getFullName()));
+
+        if (useTableStats) {
+            // The statement used to get approximate row count which is less
+            // accurate than COUNT(*), but is more efficient for large table.
+            TablePath tablePath = table.getTablePath();
+            String useDatabaseStatement =
+                    String.format("USE %s;", 
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
+            String rowCountQuery =
+                    String.format("SHOW TABLE STATUS LIKE '%s';", 
tablePath.getTableName());
+
+            try (Statement stmt = connection.createStatement()) {
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
useDatabaseStatement);
+                stmt.execute(useDatabaseStatement);
+                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
+                try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
+                    if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
+                        throw new SQLException(
+                                String.format(
+                                        "No result returned after running 
query [%s]",
+                                        rowCountQuery));
+                    }
+                    return rs.getLong(5);
+                }
+            }
+        }
+
+        return SQLUtils.countForSubquery(connection, table.getQuery());
+    }
+
+    @Override
+    public void refreshTableSchemaBySchemaChangeEvent(
+            String sourceDialectName,
+            AlterTableColumnEvent event,
+            JdbcConnectionProvider refreshTableSchemaConnectionProvider,
+            TablePath sinkTablePath) {
+        try (Connection connection =
+                        
refreshTableSchemaConnectionProvider.getOrEstablishConnection();
+                Statement stmt = connection.createStatement()) {
+            String alterTableSql = generateAlterTableSql(sourceDialectName, 
event, sinkTablePath);
+            log.info("Apply schema change with sql: {}", alterTableSql);
+            stmt.execute(alterTableSql);
+        } catch (Exception e) {
+            throw new JdbcConnectorException(
+                    
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
+        }
+    }
+
+    @Override
+    public String decorateWithComment(String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
+        OceanBaseMysqlType nativeType = (OceanBaseMysqlType) 
typeBasicTypeDefine.getNativeType();
+        if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
+            return basicSql;
+        }
+        return JdbcDialect.super.decorateWithComment(basicSql, 
typeBasicTypeDefine);
+    }
+
+    @Override
+    public boolean needsQuotesWithDefaultValue(String sqlType) {
+        OceanBaseMysqlType mysqlType = OceanBaseMysqlType.getByName(sqlType);
+        switch (mysqlType) {
+            case CHAR:
+            case VARCHAR:
+            case TEXT:
+            case TINYTEXT:
+            case MEDIUMTEXT:
+            case LONGTEXT:
+            case ENUM:
+            case SET:
+            case BLOB:
+            case TINYBLOB:
+            case MEDIUMBLOB:
+            case LONGBLOB:
+            case DATE:
+            case DATETIME:
+            case TIMESTAMP:
+            case TIME:
+            case YEAR:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    @Override
+    public boolean isSpecialDefaultValue(Object defaultValue) {
+        return MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
similarity index 50%
copy from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
copy to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index b3a456870c..2033518108 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -17,33 +17,26 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
 
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 
-import com.google.auto.service.AutoService;
-
-import javax.annotation.Nonnull;
-
-@AutoService(JdbcDialectFactory.class)
-public class OceanBaseDialectFactory implements JdbcDialectFactory {
-    @Override
-    public boolean acceptsURL(String url) {
-        return url.startsWith("jdbc:oceanbase:");
-    }
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 
+public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
     @Override
-    public JdbcDialect create() {
-        throw new UnsupportedOperationException(
-                "Can't create JdbcDialect without compatible mode for 
OceanBase");
+    public String converterName() {
+        return DatabaseIdentifier.OCENABASE;
     }
 
     @Override
-    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
-        if ("oracle".equalsIgnoreCase(compatibleMode)) {
-            return new OracleDialect();
-        }
-        return new MysqlDialect();
+    protected void writeTime(PreparedStatement statement, int index, LocalTime 
time)
+            throws SQLException {
+        // Write to time column using timestamp retains milliseconds
+        statement.setTimestamp(
+                index, 
java.sql.Timestamp.valueOf(LocalDateTime.of(LocalDate.now(), time)));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java
new file mode 100644
index 0000000000..01f8141c39
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.SQLType;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.LocalDateTime;
+
+public enum OceanBaseMysqlType implements SQLType {
+    DECIMAL(
+            "DECIMAL",
+            Types.DECIMAL,
+            BigDecimal.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            65L,
+            "[(M[,D])] [UNSIGNED] [ZEROFILL]"),
+
+    DECIMAL_UNSIGNED(
+            "DECIMAL UNSIGNED",
+            Types.DECIMAL,
+            BigDecimal.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            65L,
+            "[(M[,D])] [UNSIGNED] [ZEROFILL]"),
+
+    TINYINT(
+            "TINYINT",
+            Types.TINYINT,
+            Integer.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            3L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    TINYINT_UNSIGNED(
+            "TINYINT UNSIGNED",
+            Types.TINYINT,
+            Integer.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            3L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    BOOLEAN("BOOLEAN", Types.BOOLEAN, Boolean.class, 0, 
OceanBaseMysqlType.IS_NOT_DECIMAL, 3L, ""),
+
+    SMALLINT(
+            "SMALLINT",
+            Types.SMALLINT,
+            Integer.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            5L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    SMALLINT_UNSIGNED(
+            "SMALLINT UNSIGNED",
+            Types.SMALLINT,
+            Integer.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            5L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    INT(
+            "INT",
+            Types.INTEGER,
+            Integer.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            10L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    INT_UNSIGNED(
+            "INT UNSIGNED",
+            Types.INTEGER,
+            Long.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            10L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    FLOAT(
+            "FLOAT",
+            Types.REAL,
+            Float.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            12L,
+            "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+
+    FLOAT_UNSIGNED(
+            "FLOAT UNSIGNED",
+            Types.REAL,
+            Float.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            12L,
+            "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+
+    DOUBLE(
+            "DOUBLE",
+            Types.DOUBLE,
+            Double.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            22L,
+            "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+
+    DOUBLE_UNSIGNED(
+            "DOUBLE UNSIGNED",
+            Types.DOUBLE,
+            Double.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            22L,
+            "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+    /** FIELD_TYPE_NULL = 6 */
+    NULL("NULL", Types.NULL, Object.class, 0, 
OceanBaseMysqlType.IS_NOT_DECIMAL, 0L, ""),
+
+    TIMESTAMP(
+            "TIMESTAMP",
+            Types.TIMESTAMP,
+            Timestamp.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            26L,
+            "[(fsp)]"),
+
+    BIGINT(
+            "BIGINT",
+            Types.BIGINT,
+            Long.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            19L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    BIGINT_UNSIGNED(
+            "BIGINT UNSIGNED",
+            Types.BIGINT,
+            BigInteger.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            20L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    MEDIUMINT(
+            "MEDIUMINT",
+            Types.INTEGER,
+            Integer.class,
+            OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            7L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    MEDIUMINT_UNSIGNED(
+            "MEDIUMINT UNSIGNED",
+            Types.INTEGER,
+            Integer.class,
+            OceanBaseMysqlType.FIELD_FLAG_UNSIGNED | 
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+            OceanBaseMysqlType.IS_DECIMAL,
+            8L,
+            "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+    DATE("DATE", Types.DATE, Date.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 
10L, ""),
+
+    TIME("TIME", Types.TIME, Time.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 
16L, "[(fsp)]"),
+
+    DATETIME(
+            "DATETIME",
+            Types.TIMESTAMP,
+            LocalDateTime.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            26L,
+            "[(fsp)]"),
+
+    YEAR("YEAR", Types.DATE, Date.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 
4L, "[(4)]"),
+
+    VARCHAR(
+            "VARCHAR",
+            Types.VARCHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            65535L,
+            "(M) [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+    VARBINARY(
+            "VARBINARY",
+            Types.VARBINARY,
+            null,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            65535L,
+            "(M)"),
+
+    BIT("BIT", Types.BIT, Boolean.class, 0, OceanBaseMysqlType.IS_DECIMAL, 1L, 
"[(M)]"),
+
+    JSON(
+            "JSON",
+            Types.LONGVARCHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            1073741824L,
+            ""),
+
+    ENUM(
+            "ENUM",
+            Types.CHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            65535L,
+            "('value1','value2',...) [CHARACTER SET charset_name] [COLLATE 
collation_name]"),
+
+    SET(
+            "SET",
+            Types.CHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            64L,
+            "('value1','value2',...) [CHARACTER SET charset_name] [COLLATE 
collation_name]"),
+
+    TINYBLOB("TINYBLOB", Types.VARBINARY, null, 0, 
OceanBaseMysqlType.IS_NOT_DECIMAL, 255L, ""),
+
+    TINYTEXT(
+            "TINYTEXT",
+            Types.VARCHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            255L,
+            " [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+    MEDIUMBLOB(
+            "MEDIUMBLOB",
+            Types.LONGVARBINARY,
+            null,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            16777215L,
+            ""),
+
+    MEDIUMTEXT(
+            "MEDIUMTEXT",
+            Types.LONGVARCHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            16777215L,
+            " [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+    LONGBLOB(
+            "LONGBLOB",
+            Types.LONGVARBINARY,
+            null,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            4294967295L,
+            ""),
+
+    LONGTEXT(
+            "LONGTEXT",
+            Types.LONGVARCHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            4294967295L,
+            " [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+    BLOB("BLOB", Types.LONGVARBINARY, null, 0, 
OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, "[(M)]"),
+
+    TEXT(
+            "TEXT",
+            Types.LONGVARCHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            65535L,
+            "[(M)] [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+    CHAR(
+            "CHAR",
+            Types.CHAR,
+            String.class,
+            0,
+            OceanBaseMysqlType.IS_NOT_DECIMAL,
+            255L,
+            "[(M)] [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+    BINARY("BINARY", Types.BINARY, null, 0, OceanBaseMysqlType.IS_NOT_DECIMAL, 
255L, "(M)"),
+
+    GEOMETRY("GEOMETRY", Types.BINARY, null, 0, 
OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, ""),
+    // is represented by BLOB
+    UNKNOWN("UNKNOWN", Types.OTHER, null, 0, 
OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, "");
+
+    private final String name;
+    protected int jdbcType;
+    protected final Class<?> javaClass;
+    private final int flagsMask;
+    private final boolean isDecimal;
+    private final Long precision;
+    private final String createParams;
+
+    private OceanBaseMysqlType(
+            String oceanBaseMysqlTypeName,
+            int jdbcType,
+            Class<?> javaClass,
+            int allowedFlags,
+            boolean isDec,
+            Long precision,
+            String createParams) {
+        this.name = oceanBaseMysqlTypeName;
+        this.jdbcType = jdbcType;
+        this.javaClass = javaClass;
+        this.flagsMask = allowedFlags;
+        this.isDecimal = isDec;
+        this.precision = precision;
+        this.createParams = createParams;
+    }
+
+    public static final int FIELD_FLAG_UNSIGNED = 32;
+    public static final int FIELD_FLAG_ZEROFILL = 64;
+
+    private static final boolean IS_DECIMAL = true;
+    private static final boolean IS_NOT_DECIMAL = false;
+
+    public static OceanBaseMysqlType getByName(String fullMysqlTypeName) {
+
+        String typeName = "";
+
+        if (fullMysqlTypeName.indexOf("(") != -1) {
+            typeName = fullMysqlTypeName.substring(0, 
fullMysqlTypeName.indexOf("(")).trim();
+        } else {
+            typeName = fullMysqlTypeName;
+        }
+
+        // the order of checks is important because some short names could 
match parts of longer
+        // names
+        if (StringUtils.indexOfIgnoreCase(typeName, "DECIMAL") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "DEC") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "NUMERIC") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "FIXED") != -1) {
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                    ? DECIMAL_UNSIGNED
+                    : DECIMAL;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYBLOB") != -1) {
+            // IMPORTANT: "TINYBLOB" must be checked before "TINY"
+            return TINYBLOB;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYTEXT") != -1) {
+            // IMPORTANT: "TINYTEXT" must be checked before "TINY"
+            return TINYTEXT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYINT") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "TINY") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "INT1") != -1) {
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                            || 
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+                    ? TINYINT_UNSIGNED
+                    : TINYINT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMINT") != -1
+                // IMPORTANT: "INT24" must be checked before "INT2"
+                || StringUtils.indexOfIgnoreCase(typeName, "INT24") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "INT3") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "MIDDLEINT") != -1) 
{
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                            || 
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+                    ? MEDIUMINT_UNSIGNED
+                    : MEDIUMINT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "SMALLINT") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "INT2") != -1) {
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                            || 
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+                    ? SMALLINT_UNSIGNED
+                    : SMALLINT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "BIGINT") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "SERIAL") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "INT8") != -1) {
+            // SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT 
UNIQUE.
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                            || 
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+                    ? BIGINT_UNSIGNED
+                    : BIGINT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "POINT") != -1) {
+            // also covers "MULTIPOINT"
+            // IMPORTANT: "POINT" must be checked before "INT"
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "INT") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "INTEGER") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "INT4") != -1) {
+            // IMPORTANT: "INT" must be checked after all "*INT*" types
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                            || 
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+                    ? INT_UNSIGNED
+                    : INT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "DOUBLE") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "REAL") != -1
+                /* || StringUtils.indexOfIgnoreCase(name, "DOUBLE PRECISION") 
!= -1 is caught by "DOUBLE" check */
+                // IMPORTANT: "FLOAT8" must be checked before "FLOAT"
+                || StringUtils.indexOfIgnoreCase(typeName, "FLOAT8") != -1) {
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                            || 
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+                    ? DOUBLE_UNSIGNED
+                    : DOUBLE;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "FLOAT") != -1 /*
+         * || StringUtils.indexOfIgnoreCase(name, "FLOAT4") != -1 is caught by
+         * "FLOAT" check
+         */) {
+            return StringUtils.indexOfIgnoreCase(fullMysqlTypeName, 
"UNSIGNED") != -1
+                            || 
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+                    ? FLOAT_UNSIGNED
+                    : FLOAT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "NULL") != -1) {
+            return NULL;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "TIMESTAMP") != -1) 
{
+            // IMPORTANT: "TIMESTAMP" must be checked before "TIME"
+            return TIMESTAMP;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "DATETIME") != -1) {
+            // IMPORTANT: "DATETIME" must be checked before "DATE" and "TIME"
+            return DATETIME;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "DATE") != -1) {
+            return DATE;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "TIME") != -1) {
+            return TIME;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "YEAR") != -1) {
+            return YEAR;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "LONGBLOB") != -1) {
+            // IMPORTANT: "LONGBLOB" must be checked before "LONG" and "BLOB"
+            return LONGBLOB;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "LONGTEXT") != -1) {
+            // IMPORTANT: "LONGTEXT" must be checked before "LONG" and "TEXT"
+            return LONGTEXT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMBLOB") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "LONG VARBINARY") 
!= -1) {
+            // IMPORTANT: "MEDIUMBLOB" must be checked before "BLOB"
+            // IMPORTANT: "LONG VARBINARY" must be checked before "LONG" and 
"VARBINARY"
+            return MEDIUMBLOB;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMTEXT") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "LONG VARCHAR") != 
-1
+                || StringUtils.indexOfIgnoreCase(typeName, "LONG") != -1) {
+            // IMPORTANT: "MEDIUMTEXT" must be checked before "TEXT"
+            // IMPORTANT: "LONG VARCHAR" must be checked before "VARCHAR"
+            return MEDIUMTEXT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "VARCHAR") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "NVARCHAR") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "NATIONAL VARCHAR") 
!= -1
+                || StringUtils.indexOfIgnoreCase(typeName, "CHARACTER 
VARYING") != -1) {
+            // IMPORTANT: "CHARACTER VARYING" must be checked before 
"CHARACTER" and "CHAR"
+            return VARCHAR;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "VARBINARY") != -1) 
{
+            return VARBINARY;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "BINARY") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "CHAR BYTE") != -1) 
{
+            // IMPORTANT: "BINARY" must be checked after all "*BINARY" types
+            // IMPORTANT: "CHAR BYTE" must be checked before "CHAR"
+            return BINARY;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "LINESTRING") != 
-1) {
+            // also covers "MULTILINESTRING"
+            // IMPORTANT: "LINESTRING" must be checked before "STRING"
+            return GEOMETRY;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "STRING") != -1
+                // IMPORTANT: "CHAR" must be checked after all "*CHAR*" types
+                || StringUtils.indexOfIgnoreCase(typeName, "CHAR") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "NCHAR") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "NATIONAL CHAR") != 
-1
+                || StringUtils.indexOfIgnoreCase(typeName, "CHARACTER") != -1) 
{
+            return CHAR;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "BOOLEAN") != -1
+                || StringUtils.indexOfIgnoreCase(typeName, "BOOL") != -1) {
+            return BOOLEAN;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "BIT") != -1) {
+            return BIT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "JSON") != -1) {
+            return JSON;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "ENUM") != -1) {
+            return ENUM;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "SET") != -1) {
+            return SET;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "BLOB") != -1) {
+            return BLOB;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "TEXT") != -1) {
+            return TEXT;
+
+        } else if (StringUtils.indexOfIgnoreCase(typeName, "GEOM")
+                        != -1 // covers "GEOMETRY", "GEOMETRYCOLLECTION" and 
"GEOMCOLLECTION"
+                || StringUtils.indexOfIgnoreCase(typeName, "POINT")
+                        != -1 // also covers "MULTIPOINT"
+                || StringUtils.indexOfIgnoreCase(typeName, "POLYGON")
+                        != -1 // also covers "MULTIPOLYGON"
+        ) {
+            return GEOMETRY;
+        }
+
+        return UNKNOWN;
+    }
+
+    @Override
+    public String getVendor() {
+        return "com.oceanbase";
+    }
+
+    @Override
+    public Integer getVendorTypeNumber() {
+        return this.jdbcType;
+    }
+
+    @Override
+    public String getName() {
+        return this.name;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index a689632206..860131041a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -393,6 +393,8 @@ public class JdbcCatalogUtils {
                 .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.USERNAME.key(), val));
         config.getPassword()
                 .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
+        Optional.ofNullable(config.getCompatibleMode())
+                .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.COMPATIBLE_MODE.key(), val));
         return ReadonlyConfig.fromMap(catalogConfig);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
index 3208473d61..a747058391 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
@@ -66,16 +66,10 @@ public class JdbcOceanBaseMysqlIT extends 
JdbcOceanBaseITBase {
                                 "bash",
                                 "-c",
                                 "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && wget "
-                                        + driverUrl()
-                                        + " && wget "
-                                        + mysqlDriverUrl());
+                                        + driverUrl());
                 Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
             };
 
-    String mysqlDriverUrl() {
-        return 
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
-    }
-
     @Override
     List<String> configFile() {
         return 
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");


Reply via email to