This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 058f5594a3 [Improve][mysql-cdc] Support mysql 5.5 versions (#6710)
058f5594a3 is described below

commit 058f5594a3691f3f187c63a5d3bcb1a0e29b6a53
Author: hailin0 <[email protected]>
AuthorDate: Wed Apr 24 19:32:29 2024 +0800

    [Improve][mysql-cdc] Support mysql 5.5 versions (#6710)
---
 docs/en/connector-v2/source/MySQL-CDC.md           |  23 ++-
 .../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java  |   2 +-
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java |  32 ++-
 .../catalog/mysql/MysqlCreateTableSqlBuilder.java  |  20 +-
 .../jdbc/catalog/mysql/MysqlDataTypeConvertor.java |   5 +-
 .../internal/dialect/mysql/MySqlTypeConverter.java |  17 +-
 .../internal/dialect/mysql/MySqlTypeMapper.java    |  12 +-
 .../jdbc/internal/dialect/mysql/MySqlVersion.java  |  59 ++++++
 .../sql/MysqlCreateTableSqlBuilderTest.java        |   4 +-
 .../dialect/mysql/MySqlTypeConverterTest.java      | 216 ++++++++++++++-------
 10 files changed, 298 insertions(+), 92 deletions(-)

diff --git a/docs/en/connector-v2/source/MySQL-CDC.md 
b/docs/en/connector-v2/source/MySQL-CDC.md
index aac7925dbd..2509262709 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -23,9 +23,9 @@ describes how to set up the MySQL CDC connector to run SQL 
queries against MySQL
 
 ## Supported DataSource Info
 
-| Datasource |                                                               
Supported versions                                                              
  |          Driver          |               Url                |               
                 Maven                                 |
-|------------|-------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
-| MySQL      | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x 
</li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 
8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
+| Datasource |                                                                 
 Supported versions                                                             
     |          Driver          |               Url                |            
                    Maven                                 |
+|------------|------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
+| MySQL      | <li> [MySQL](https://dev.mysql.com/doc): 5.5, 5.6, 5.7, 8.0.x 
</li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 
8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
 
 ## Using Dependency
 
@@ -92,9 +92,11 @@ server-id         = 223344
 log_bin           = mysql-bin
 expire_logs_days  = 10
 binlog_format     = row
+# mysql 5.6+ requires binlog_row_image to be set to FULL
 binlog_row_image  = FULL
 
 # enable gtid mode
+# mysql 5.6+ requires gtid_mode to be set to ON
 gtid_mode = on
 enforce_gtid_consistency = on
 ```
@@ -107,6 +109,21 @@ enforce_gtid_consistency = on
 
 4. Confirm your changes by checking the binlog status once more:
 
+MySQL 5.5:
+
+```sql
+mysql> show variables where variable_name in ('log_bin', 'binlog_format', 
'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
++--------------------------+----------------+
+| Variable_name            | Value          |
++--------------------------+----------------+
+| binlog_format            | ROW            |
+| log_bin                  | ON             |
++--------------------------+----------------+
+5 rows in set (0.00 sec)
+```
+
+MySQL 5.6+:
+
 ```sql
 mysql> show variables where variable_name in ('log_bin', 'binlog_format', 
'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
 +--------------------------+----------------+
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
index ffc782a840..86597b350a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -92,6 +92,6 @@ public class MySqlTypeUtils {
             default:
                 break;
         }
-        return MySqlTypeConverter.INSTANCE.convert(builder.build());
+        return MySqlTypeConverter.DEFAULT_INSTANCE.convert(builder.build());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 3be0a83c85..6b263b0fd4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;
 
 import com.google.common.base.Preconditions;
 import com.mysql.cj.MysqlType;
@@ -39,6 +40,7 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
@@ -56,9 +58,14 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         SYS_DATABASES.add("sys");
     }
 
+    private MySqlVersion version;
+    private MySqlTypeConverter typeConverter;
+
     public MySqlCatalog(
             String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
         super(catalogName, username, pwd, urlInfo, null);
+        this.version = resolveVersion();
+        this.typeConverter = new MySqlTypeConverter(version);
     }
 
     @Override
@@ -130,7 +137,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         // e.g. `varchar(10)` is 40
         long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
         // e.g. `timestamp(3)` is 3
-        int timePrecision = resultSet.getInt("DATETIME_PRECISION");
+        int timePrecision =
+                MySqlVersion.V_5_5.equals(version) ? 0 : 
resultSet.getInt("DATETIME_PRECISION");
 
         Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 
0));
         Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));
@@ -152,12 +160,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
                         .defaultValue(defaultValue)
                         .comment(comment)
                         .build();
-        return MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        return typeConverter.convert(typeDefine);
     }
 
     @Override
     protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
-        return MysqlCreateTableSqlBuilder.builder(tablePath, 
table).build(table.getCatalogName());
+        return MysqlCreateTableSqlBuilder.builder(tablePath, table, 
typeConverter)
+                .build(table.getCatalogName());
     }
 
     @Override
@@ -179,7 +188,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
     @Override
     public CatalogTable getTable(String sqlQuery) throws SQLException {
         Connection defaultConnection = getConnection(defaultUrl);
-        return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new 
MySqlTypeMapper());
+        return CatalogUtils.getCatalogTable(
+                defaultConnection, sqlQuery, new 
MySqlTypeMapper(typeConverter));
     }
 
     @Override
@@ -193,4 +203,18 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
                 "SELECT * FROM `%s`.`%s` LIMIT 1;",
                 tablePath.getDatabaseName(), tablePath.getTableName());
     }
+
+    private MySqlVersion resolveVersion() {
+        try (Statement statement = getConnection(defaultUrl).createStatement();
+                ResultSet resultSet = statement.executeQuery("SELECT 
VERSION()")) {
+            resultSet.next();
+            return MySqlVersion.parse(resultSet.getString(1));
+        } catch (Exception e) {
+            log.info(
+                    "Failed to get mysql version, fallback to default version: 
{}",
+                    MySqlVersion.V_5_7,
+                    e);
+            return MySqlVersion.V_5_7;
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index efa46f6815..a35767640f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.SqlType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
@@ -57,20 +58,23 @@ public class MysqlCreateTableSqlBuilder {
 
     private String fieldIde;
 
-    private MysqlCreateTableSqlBuilder(String tableName) {
+    private final MySqlTypeConverter typeConverter;
+
+    private MysqlCreateTableSqlBuilder(String tableName, MySqlTypeConverter 
typeConverter) {
         checkNotNull(tableName, "tableName must not be null");
         this.tableName = tableName;
+        this.typeConverter = typeConverter;
     }
 
     public static MysqlCreateTableSqlBuilder builder(
-            TablePath tablePath, CatalogTable catalogTable) {
+            TablePath tablePath, CatalogTable catalogTable, MySqlTypeConverter 
typeConverter) {
         checkNotNull(tablePath, "tablePath must not be null");
         checkNotNull(catalogTable, "catalogTable must not be null");
 
         TableSchema tableSchema = catalogTable.getTableSchema();
         checkNotNull(tableSchema, "tableSchema must not be null");
 
-        return new MysqlCreateTableSqlBuilder(tablePath.getTableName())
+        return new MysqlCreateTableSqlBuilder(tablePath.getTableName(), 
typeConverter)
                 .comment(catalogTable.getComment())
                 // todo: set charset and collate
                 .engine(null)
@@ -167,10 +171,16 @@ public class MysqlCreateTableSqlBuilder {
         final List<String> columnSqls = new ArrayList<>();
         columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "`"));
         boolean isSupportDef = true;
-        if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
+
+        if ((SqlType.TIME.equals(column.getDataType().getSqlType())
+                        || 
SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
+                && column.getScale() != null) {
+            BasicTypeDefine<MysqlType> typeDefine = 
typeConverter.reconvert(column);
+            columnSqls.add(typeDefine.getColumnType());
+        } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
             columnSqls.add(column.getSourceType());
         } else {
-            BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+            BasicTypeDefine<MysqlType> typeDefine = 
typeConverter.reconvert(column);
             columnSqls.add(typeDefine.getColumnType());
         }
         // nullable
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
index 48d6fc4a6d..d0a1fe7df0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
@@ -102,7 +102,7 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
                         .scale(scale)
                         .build();
 
-        return MySqlTypeConverter.INSTANCE.convert(typeDefine).getDataType();
+        return 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine).getDataType();
     }
 
     @Override
@@ -122,7 +122,8 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
                         .nullable(true)
                         .build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         return typeDefine.getNativeType();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
index 77180cef37..2c74845edc 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
@@ -100,7 +100,14 @@ public class MySqlTypeConverter implements 
TypeConverter<BasicTypeDefine<MysqlTy
     public static final long POWER_2_24 = (long) Math.pow(2, 24);
     public static final long POWER_2_32 = (long) Math.pow(2, 32);
     public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
-    public static final MySqlTypeConverter INSTANCE = new MySqlTypeConverter();
+    public static final MySqlTypeConverter DEFAULT_INSTANCE =
+            new MySqlTypeConverter(MySqlVersion.V_5_7);
+
+    private final MySqlVersion version;
+
+    public MySqlTypeConverter(MySqlVersion version) {
+        this.version = version;
+    }
 
     @Override
     public String identifier() {
@@ -462,7 +469,9 @@ public class MySqlTypeConverter implements 
TypeConverter<BasicTypeDefine<MysqlTy
             case TIME:
                 builder.nativeType(MysqlType.TIME);
                 builder.dataType(MYSQL_TIME);
-                if (column.getScale() != null && column.getScale() > 0) {
+                if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
+                    builder.columnType(MYSQL_TIME);
+                } else if (column.getScale() != null && column.getScale() > 0) 
{
                     int timeScale = column.getScale();
                     if (timeScale > MAX_TIME_SCALE) {
                         timeScale = MAX_TIME_SCALE;
@@ -484,7 +493,9 @@ public class MySqlTypeConverter implements 
TypeConverter<BasicTypeDefine<MysqlTy
             case TIMESTAMP:
                 builder.nativeType(MysqlType.DATETIME);
                 builder.dataType(MYSQL_DATETIME);
-                if (column.getScale() != null && column.getScale() > 0) {
+                if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
+                    builder.columnType(MYSQL_DATETIME);
+                } else if (column.getScale() != null && column.getScale() > 0) 
{
                     int timestampScale = column.getScale();
                     if (timestampScale > MAX_TIMESTAMP_SCALE) {
                         timestampScale = MAX_TIMESTAMP_SCALE;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
index 7c574e4d00..92a2765d8e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -28,9 +28,19 @@ import java.util.Arrays;
 
 public class MySqlTypeMapper implements JdbcDialectTypeMapper {
 
+    private MySqlTypeConverter typeConverter;
+
+    public MySqlTypeMapper() {
+        this(MySqlTypeConverter.DEFAULT_INSTANCE);
+    }
+
+    public MySqlTypeMapper(MySqlTypeConverter typeConverter) {
+        this.typeConverter = typeConverter;
+    }
+
     @Override
     public Column mappingColumn(BasicTypeDefine typeDefine) {
-        return MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        return typeConverter.convert(typeDefine);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlVersion.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlVersion.java
new file mode 100644
index 0000000000..6ffbf77356
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlVersion.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+public enum MySqlVersion {
+    V_5_5,
+    V_5_6,
+    V_5_7,
+    V_8;
+
+    public static MySqlVersion parse(String version) {
+        if (version != null) {
+            if (version.startsWith("5.5")) {
+                return V_5_5;
+            }
+            if (version.startsWith("5.6")) {
+                return V_5_6;
+            }
+            if (version.startsWith("5.7")) {
+                return V_5_7;
+            }
+            if (version.startsWith("8.0")) {
+                return V_8;
+            }
+        }
+        throw new UnsupportedOperationException("Unsupported MySQL version: " 
+ version);
+    }
+
+    public boolean isBefore(MySqlVersion version) {
+        return this.compareTo(version) < 0;
+    }
+
+    public boolean isAtOrBefore(MySqlVersion version) {
+        return this.compareTo(version) <= 0;
+    }
+
+    public boolean isAfter(MySqlVersion version) {
+        return this.compareTo(version) > 0;
+    }
+
+    public boolean isAtOrAfter(MySqlVersion version) {
+        return this.compareTo(version) >= 0;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
index 7505ffb089..2e7a72589f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -91,7 +92,8 @@ public class MysqlCreateTableSqlBuilderTest {
                         "User table");
 
         String createTableSql =
-                MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable)
+                MysqlCreateTableSqlBuilder.builder(
+                                tablePath, catalogTable, 
MySqlTypeConverter.DEFAULT_INSTANCE)
                         .build(DatabaseIdentifier.MYSQL);
         // create table sql is change; The old unit tests are no longer 
applicable
         String expect =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverterTest.java
index bc48ca9cad..2e11212b31 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverterTest.java
@@ -40,7 +40,7 @@ public class MySqlTypeConverterTest {
         BasicTypeDefine<Object> typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("aaa").dataType("aaa").build();
         try {
-            MySqlTypeConverter.INSTANCE.convert(typeDefine);
+            MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
             Assertions.fail();
         } catch (SeaTunnelRuntimeException e) {
             // ignore
@@ -60,7 +60,7 @@ public class MySqlTypeConverterTest {
                         .defaultValue("null")
                         .comment("null")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.VOID_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -78,7 +78,7 @@ public class MySqlTypeConverterTest {
                         .dataType("bit")
                         .length(1L)
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -90,7 +90,7 @@ public class MySqlTypeConverterTest {
                         .dataType("bit")
                         .length(9L)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(2, column.getColumnLength());
@@ -106,7 +106,7 @@ public class MySqlTypeConverterTest {
                         .dataType("tinyint")
                         .length(1L)
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -118,7 +118,7 @@ public class MySqlTypeConverterTest {
                         .dataType("tinyint")
                         .length(2L)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.BYTE_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -129,7 +129,7 @@ public class MySqlTypeConverterTest {
                         .columnType("tinyint unsigned")
                         .dataType("tinyint unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -141,7 +141,7 @@ public class MySqlTypeConverterTest {
                         .dataType("tinyint")
                         .unsigned(true)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -155,7 +155,7 @@ public class MySqlTypeConverterTest {
                         .columnType("smallint")
                         .dataType("smallint")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -166,7 +166,7 @@ public class MySqlTypeConverterTest {
                         .columnType("smallint unsigned")
                         .dataType("smallint unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -180,7 +180,7 @@ public class MySqlTypeConverterTest {
                         .columnType("mediumint")
                         .dataType("mediumint")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -191,7 +191,7 @@ public class MySqlTypeConverterTest {
                         .columnType("mediumint unsigned")
                         .dataType("mediumint unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -201,7 +201,7 @@ public class MySqlTypeConverterTest {
     public void testConvertInt() {
         BasicTypeDefine<Object> typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("int").dataType("int").build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -212,7 +212,7 @@ public class MySqlTypeConverterTest {
                         .columnType("integer")
                         .dataType("integer")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -223,7 +223,7 @@ public class MySqlTypeConverterTest {
                         .columnType("int unsigned")
                         .dataType("int unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -234,7 +234,7 @@ public class MySqlTypeConverterTest {
                         .columnType("integer unsigned")
                         .dataType("integer unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -248,7 +248,7 @@ public class MySqlTypeConverterTest {
                         .columnType("bigint")
                         .dataType("bigint")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -259,7 +259,7 @@ public class MySqlTypeConverterTest {
                         .columnType("bigint unsigned")
                         .dataType("bigint unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(20, 0), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -273,7 +273,7 @@ public class MySqlTypeConverterTest {
                         .columnType("float")
                         .dataType("float")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -284,7 +284,7 @@ public class MySqlTypeConverterTest {
                         .columnType("float unsigned")
                         .dataType("float unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -298,7 +298,7 @@ public class MySqlTypeConverterTest {
                         .columnType("double")
                         .dataType("double")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -309,7 +309,7 @@ public class MySqlTypeConverterTest {
                         .columnType("double unsigned")
                         .dataType("double unsigned")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -325,7 +325,7 @@ public class MySqlTypeConverterTest {
                         .precision(38L)
                         .scale(2)
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 2), column.getDataType());
         Assertions.assertEquals(38, column.getColumnLength());
@@ -340,7 +340,7 @@ public class MySqlTypeConverterTest {
                         .precision(39L)
                         .scale(2)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(
                 new DecimalType(
@@ -356,7 +356,7 @@ public class MySqlTypeConverterTest {
                         .precision(38L)
                         .scale(2)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(39, 2), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -371,7 +371,7 @@ public class MySqlTypeConverterTest {
                         .dataType("enum")
                         .length(3L)
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(3, column.getColumnLength());
@@ -387,7 +387,7 @@ public class MySqlTypeConverterTest {
                         .dataType("char")
                         .length(2L)
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(2, column.getColumnLength());
@@ -400,7 +400,7 @@ public class MySqlTypeConverterTest {
                         .dataType("varchar")
                         .length(2L)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(2, column.getColumnLength());
@@ -415,7 +415,7 @@ public class MySqlTypeConverterTest {
                         .columnType("tinytext")
                         .dataType("tinytext")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(255, column.getColumnLength());
@@ -423,7 +423,7 @@ public class MySqlTypeConverterTest {
 
         typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("text").dataType("text").build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(65535, column.getColumnLength());
@@ -435,7 +435,7 @@ public class MySqlTypeConverterTest {
                         .columnType("mediumtext")
                         .dataType("mediumtext")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(16777215, column.getColumnLength());
@@ -447,7 +447,7 @@ public class MySqlTypeConverterTest {
                         .columnType("longtext")
                         .dataType("longtext")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(4294967295L, column.getColumnLength());
@@ -458,7 +458,7 @@ public class MySqlTypeConverterTest {
     public void testConvertJson() {
         BasicTypeDefine<Object> typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("json").dataType("json").build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
         Assertions.assertEquals(null, column.getColumnLength());
@@ -474,7 +474,7 @@ public class MySqlTypeConverterTest {
                         .dataType("binary")
                         .length(1L)
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(1, column.getColumnLength());
@@ -487,7 +487,7 @@ public class MySqlTypeConverterTest {
                         .dataType("varbinary")
                         .length(1L)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(1, column.getColumnLength());
@@ -502,7 +502,7 @@ public class MySqlTypeConverterTest {
                         .columnType("tinyblob")
                         .dataType("tinyblob")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(255, column.getColumnLength());
@@ -510,7 +510,7 @@ public class MySqlTypeConverterTest {
 
         typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("blob").dataType("blob").build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(65535, column.getColumnLength());
@@ -522,7 +522,7 @@ public class MySqlTypeConverterTest {
                         .columnType("mediumblob")
                         .dataType("mediumblob")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(16777215, column.getColumnLength());
@@ -534,7 +534,7 @@ public class MySqlTypeConverterTest {
                         .columnType("longblob")
                         .dataType("longblob")
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(4294967295L, column.getColumnLength());
@@ -549,7 +549,7 @@ public class MySqlTypeConverterTest {
                         .columnType("geometry")
                         .dataType("geometry")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -559,7 +559,7 @@ public class MySqlTypeConverterTest {
     public void testConvertDate() {
         BasicTypeDefine<Object> typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("date").dataType("date").build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -574,7 +574,7 @@ public class MySqlTypeConverterTest {
                         .dataType("time")
                         .scale(3)
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getScale(), column.getScale());
@@ -589,7 +589,7 @@ public class MySqlTypeConverterTest {
                         .columnType("datetime")
                         .dataType("datetime")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -601,7 +601,7 @@ public class MySqlTypeConverterTest {
                         .dataType("datetime")
                         .scale(3)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getScale(), column.getScale());
@@ -616,7 +616,7 @@ public class MySqlTypeConverterTest {
                         .columnType("timestamp")
                         .dataType("timestamp")
                         .build();
-        Column column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = 
MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -628,7 +628,7 @@ public class MySqlTypeConverterTest {
                         .dataType("timestamp")
                         .scale(3)
                         .build();
-        column = MySqlTypeConverter.INSTANCE.convert(typeDefine);
+        column = MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getScale(), column.getScale());
@@ -646,7 +646,7 @@ public class MySqlTypeConverterTest {
                         null,
                         null);
         try {
-            MySqlTypeConverter.INSTANCE.reconvert(column);
+            MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
             Assertions.fail();
         } catch (SeaTunnelRuntimeException e) {
             // ignore
@@ -660,7 +660,8 @@ public class MySqlTypeConverterTest {
         Column column =
                 PhysicalColumn.of("test", BasicType.VOID_TYPE, (Long) null, 
true, "null", "null");
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.NULL, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_NULL, 
typeDefine.getColumnType());
@@ -675,7 +676,8 @@ public class MySqlTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.BOOLEAN_TYPE).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.BOOLEAN, typeDefine.getNativeType());
         Assertions.assertEquals(
@@ -689,7 +691,8 @@ public class MySqlTypeConverterTest {
     public void testReconvertByte() {
         Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.BYTE_TYPE).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.TINYINT, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_TINYINT, 
typeDefine.getColumnType());
@@ -701,7 +704,8 @@ public class MySqlTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.SHORT_TYPE).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.SMALLINT, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_SMALLINT, 
typeDefine.getColumnType());
@@ -712,7 +716,8 @@ public class MySqlTypeConverterTest {
     public void testReconvertInt() {
         Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.INT_TYPE).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.INT, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_INT, 
typeDefine.getColumnType());
@@ -723,7 +728,8 @@ public class MySqlTypeConverterTest {
     public void testReconvertLong() {
         Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.LONG_TYPE).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.BIGINT, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_BIGINT, 
typeDefine.getColumnType());
@@ -735,7 +741,8 @@ public class MySqlTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.FLOAT_TYPE).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.FLOAT, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_FLOAT, 
typeDefine.getColumnType());
@@ -747,7 +754,8 @@ public class MySqlTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.DOUBLE_TYPE).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.DOUBLE, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_DOUBLE, 
typeDefine.getColumnType());
@@ -759,7 +767,8 @@ public class MySqlTypeConverterTest {
         Column column =
                 PhysicalColumn.builder().name("test").dataType(new 
DecimalType(0, 0)).build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.DECIMAL, typeDefine.getNativeType());
         Assertions.assertEquals(
@@ -773,7 +782,7 @@ public class MySqlTypeConverterTest {
 
         column = PhysicalColumn.builder().name("test").dataType(new 
DecimalType(10, 2)).build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.DECIMAL, typeDefine.getNativeType());
         Assertions.assertEquals(
@@ -791,7 +800,8 @@ public class MySqlTypeConverterTest {
                         .columnLength(null)
                         .build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.VARBINARY, 
typeDefine.getNativeType());
         Assertions.assertEquals("VARBINARY(32766)", 
typeDefine.getColumnType());
@@ -804,7 +814,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(255L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.VARBINARY, 
typeDefine.getNativeType());
         Assertions.assertEquals(
@@ -820,7 +830,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(65535L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.MEDIUMBLOB, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_MEDIUMBLOB, 
typeDefine.getColumnType());
@@ -833,7 +843,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(16777215L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.MEDIUMBLOB, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_MEDIUMBLOB, 
typeDefine.getColumnType());
@@ -846,7 +856,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(4294967295L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.LONGBLOB, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_LONGBLOB, 
typeDefine.getColumnType());
@@ -862,7 +872,8 @@ public class MySqlTypeConverterTest {
                         .columnLength(null)
                         .build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.LONGTEXT, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_LONGTEXT, 
typeDefine.getColumnType());
@@ -875,7 +886,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(255L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.VARCHAR, typeDefine.getNativeType());
         Assertions.assertEquals(
@@ -890,7 +901,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(65535L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.TEXT, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_TEXT, 
typeDefine.getColumnType());
@@ -903,7 +914,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(16777215L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.MEDIUMTEXT, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_MEDIUMTEXT, 
typeDefine.getColumnType());
@@ -916,7 +927,7 @@ public class MySqlTypeConverterTest {
                         .columnLength(4294967295L)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.LONGTEXT, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_LONGTEXT, 
typeDefine.getColumnType());
@@ -931,7 +942,8 @@ public class MySqlTypeConverterTest {
                         .dataType(LocalTimeType.LOCAL_DATE_TYPE)
                         .build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.DATE, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_DATE, 
typeDefine.getColumnType());
@@ -946,7 +958,8 @@ public class MySqlTypeConverterTest {
                         .dataType(LocalTimeType.LOCAL_TIME_TYPE)
                         .build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.TIME, typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_TIME, 
typeDefine.getColumnType());
@@ -959,7 +972,7 @@ public class MySqlTypeConverterTest {
                         .scale(3)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.TIME, typeDefine.getNativeType());
         Assertions.assertEquals(
@@ -969,6 +982,35 @@ public class MySqlTypeConverterTest {
         Assertions.assertEquals(column.getScale(), typeDefine.getScale());
     }
 
+    @Test
+    public void testReconvertTimeForV55() {
+        MySqlTypeConverter typeConverter = new 
MySqlTypeConverter(MySqlVersion.V_5_5);
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_TIME_TYPE)
+                        .build();
+
+        BasicTypeDefine<MysqlType> typeDefine = 
typeConverter.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(MysqlType.TIME, typeDefine.getNativeType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_TIME, 
typeDefine.getColumnType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_TIME, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_TIME_TYPE)
+                        .scale(3)
+                        .build();
+
+        typeDefine = typeConverter.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(MysqlType.TIME, typeDefine.getNativeType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_TIME, 
typeDefine.getColumnType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_TIME, 
typeDefine.getDataType());
+    }
+
     @Test
     public void testReconvertDatetime() {
         Column column =
@@ -977,7 +1019,8 @@ public class MySqlTypeConverterTest {
                         .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
                         .build();
 
-        BasicTypeDefine<MysqlType> typeDefine = 
MySqlTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine<MysqlType> typeDefine =
+                MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.DATETIME, 
typeDefine.getNativeType());
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_DATETIME, 
typeDefine.getColumnType());
@@ -990,7 +1033,7 @@ public class MySqlTypeConverterTest {
                         .scale(3)
                         .build();
 
-        typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(MysqlType.DATETIME, 
typeDefine.getNativeType());
         Assertions.assertEquals(
@@ -999,4 +1042,33 @@ public class MySqlTypeConverterTest {
         Assertions.assertEquals(MySqlTypeConverter.MYSQL_DATETIME, 
typeDefine.getDataType());
         Assertions.assertEquals(column.getScale(), typeDefine.getScale());
     }
+
+    @Test
+    public void testReconvertDatetimeForV55() {
+        MySqlTypeConverter typeConverter = new 
MySqlTypeConverter(MySqlVersion.V_5_5);
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+                        .build();
+
+        BasicTypeDefine<MysqlType> typeDefine = 
typeConverter.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(MysqlType.DATETIME, 
typeDefine.getNativeType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_DATETIME, 
typeDefine.getColumnType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_DATETIME, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+                        .scale(3)
+                        .build();
+
+        typeDefine = typeConverter.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(MysqlType.DATETIME, 
typeDefine.getNativeType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_DATETIME, 
typeDefine.getColumnType());
+        Assertions.assertEquals(MySqlTypeConverter.MYSQL_DATETIME, 
typeDefine.getDataType());
+    }
 }

Reply via email to