This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 80f392afbb [feature][connector-v2] add xugudb connector (#6561)
80f392afbb is described below

commit 80f392afbb654145ea3188ba51b1f4bafcd2ef5b
Author: L-Gryps <[email protected]>
AuthorDate: Tue Apr 2 16:42:52 2024 +0800

    [feature][connector-v2] add xugudb connector (#6561)
---
 .github/workflows/backend.yml                      |   2 +-
 docs/en/connector-v2/sink/Jdbc.md                  |   1 +
 docs/en/connector-v2/source/Jdbc.md                |   1 +
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |   7 +
 .../seatunnel/jdbc/catalog/xugu/XuguCatalog.java   | 266 +++++++++
 .../jdbc/catalog/xugu/XuguCatalogFactory.java      |  63 ++
 .../catalog/xugu/XuguCreateTableSqlBuilder.java    | 141 +++++
 .../jdbc/internal/dialect/DatabaseIdentifier.java  |   1 +
 .../jdbc/internal/dialect/xugu/XuguDialect.java    | 231 ++++++++
 .../internal/dialect/xugu/XuguDialectFactory.java  |  45 ++
 .../dialect/xugu/XuguJdbcRowConverter.java         |  29 +
 .../internal/dialect/xugu/XuguTypeConverter.java   | 385 ++++++++++++
 .../jdbc/internal/dialect/xugu/XuguTypeMapper.java |  63 ++
 .../dialect/xugu/XuguTypeConverterTest.java        | 660 +++++++++++++++++++++
 .../src/main/assembly/assembly-bin-ci.xml          |   1 +
 .../connector-jdbc-e2e-part-7/pom.xml              |   5 +
 .../connectors/seatunnel/jdbc/JdbcXuguIT.java      | 246 ++++++++
 .../test/resources/jdbc_xugu_source_and_sink.conf  |  47 ++
 .../jdbc_xugu_source_and_upsert_sink.conf          |  48 ++
 19 files changed, 2241 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 519cf8533d..9975d477da 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -997,7 +997,7 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
-      - name: run jdbc connectors integration test (part-6)
+      - name: run jdbc connectors integration test (part-7)
         if: needs.changes.outputs.api == 'true'
         run: |
           ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl 
:connector-jdbc-e2e-part-7 -am -Pci
diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index f0b74414a4..c2591761ec 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -235,6 +235,7 @@ there are some reference value for params above.
 | Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
 | Kingbase   | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                          |
 | OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
              |
+| xugu       | com.xugu.cloudjdbc.Driver                    | 
jdbc:xugu://localhost:5138                                         | /          
                                        | 
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar 
                            |
 
 ## Example
 
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 09c3ab636d..225576001d 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -227,6 +227,7 @@ there are some reference value for params above.
 | Kingbase   | com.kingbase8.Driver                                | 
jdbc:kingbase8://localhost:54321/db_test                               | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                          |
 | OceanBase  | com.oceanbase.jdbc.Driver                           | 
jdbc:oceanbase://localhost:2881                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
              |
 | Hive       | org.apache.hive.jdbc.HiveDriver                     | 
jdbc:hive2://localhost:10000                                           | 
https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar
               |
+| xugu       | com.xugu.cloudjdbc.Driver                           | 
jdbc:xugu://localhost:5138                                             | 
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar 
                            |
 
 ## Example
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 5880036c90..db8c95dd0f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -50,6 +50,7 @@
         <kingbase8.version>8.6.0</kingbase8.version>
         <hive.jdbc.version>3.1.3</hive.jdbc.version>
         <oceanbase.jdbc.version>2.4.3</oceanbase.jdbc.version>
+        <xugu.jdbc.version>12.2.0</xugu.jdbc.version>
     </properties>
 
     <dependencyManagement>
@@ -188,6 +189,12 @@
                 <version>${oceanbase.jdbc.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>com.xugudb</groupId>
+                <artifactId>xugu-jdbc</artifactId>
+                <version>${xugu.jdbc.version}</version>
+                <scope>provided</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
new file mode 100644
index 0000000000..462e109c76
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class XuguCatalog extends AbstractJdbcCatalog {
+
+    protected static List<String> EXCLUDED_SCHEMAS =
+            Collections.unmodifiableList(Arrays.asList("GUEST", "SYSAUDITOR", 
"SYSSSO"));
+
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+            "SELECT\n"
+                    + "    dc.COLUMN_NAME,\n"
+                    + "    CASE\n"
+                    + "        WHEN dc.TYPE_NAME LIKE 'INTERVAL%%' THEN 
'INTERVAL' ELSE REGEXP_SUBSTR(dc.TYPE_NAME, '^[^(]+')\n"
+                    + "    END AS TYPE_NAME,\n"
+                    + "    dc.TYPE_NAME ||\n"
+                    + "    CASE\n"
+                    + "        WHEN dc.TYPE_NAME IN ('VARCHAR', 'CHAR') THEN 
'(' || dc.COLUMN_LENGTH || ')'\n"
+                    + "        WHEN dc.TYPE_NAME IN ('NUMERIC') AND 
dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NOT NULL THEN '(' || 
dc.COLUMN_PRECISION || ', ' || dc.COLUMN_SCALE || ')'\n"
+                    + "        WHEN dc.TYPE_NAME IN ('NUMERIC') AND 
dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NULL THEN '(' || 
dc.COLUMN_PRECISION || ')'\n"
+                    + "        WHEN dc.TYPE_NAME IN ('TIMESTAMP') THEN '(' || 
dc.COLUMN_SCALE || ')'\n"
+                    + "    END AS FULL_TYPE_NAME,\n"
+                    + "    dc.COLUMN_LENGTH,\n"
+                    + "    dc.COLUMN_PRECISION,\n"
+                    + "    dc.COLUMN_SCALE,\n"
+                    + "    dc.COLUMN_COMMENT,\n"
+                    + "    dc.DEFAULT_VALUE,\n"
+                    + "    CASE\n"
+                    + "        dc.IS_NULLABLE WHEN TRUE THEN 'NO' ELSE 'YES'\n"
+                    + "    END AS IS_NULLABLE\n"
+                    + "FROM\n"
+                    + "    (\n"
+                    + "    SELECT\n"
+                    + "        c.col_name AS COLUMN_NAME,\n"
+                    + "        CASE\n"
+                    + "            WHEN c.type_name = 'CHAR' AND c.\"VARYING\" 
= TRUE THEN 'VARCHAR'\n"
+                    + "            WHEN c.type_name = 'DATETIME' AND 
c.TIMESTAMP_T = 'i' THEN 'TIMESTAMP' ELSE c.type_name\n"
+                    + "        END AS TYPE_NAME,\n"
+                    + "        DECODE(c.type_name,\n"
+                    + "        'TINYINT', 1, 'SMALLINT', 2,\n"
+                    + "        'INTEGER', 4, 'BIGINT', 8,\n"
+                    + "        'FLOAT', 4, 'DOUBLE', 8,\n"
+                    + "        'NUMERIC', 17,\n"
+                    + "        'CHAR', DECODE(c.scale, -1, 60000, c.scale),\n"
+                    + "        'DATE', 4, 'DATETIME', 8,\n"
+                    + "        'TIMESTAMP', 8, 'DATETIME WITH TIME ZONE', 8,\n"
+                    + "        'TIME', 4, 'TIME WITH TIME ZONE', 4,\n"
+                    + "        'INTERVAL YEAR', 4, 'INTERVAL MONTH', 4,\n"
+                    + "        'INTERVAL DAY', 4, 'INTERVAL HOUR', 4,\n"
+                    + "        'INTERVAL MINUTE', 4, 'INTERVAL SECOND', 8,\n"
+                    + "        'INTERVAL YEAR TO MONTH', 4,\n"
+                    + "        'INTERVAL DAY TO HOUR', 4,\n"
+                    + "        'INTERVAL DAY TO MINUTE', 4,\n"
+                    + "        'INTERVAL DAY TO SECOND', 8,\n"
+                    + "        'INTERVAL HOUR TO MINUTE', 4,\n"
+                    + "        'INTERVAL HOUR TO SECOND', 8,\n"
+                    + "        'INTERVAL MINUTE TO SECOND', 8,\n"
+                    + "        'CLOB', 2147483648,\n"
+                    + "        'BLOB', 2147483648, 'BINARY', 2147483648,\n"
+                    + "        'GUID', 2, 'BOOLEAN', 1,\n"
+                    + "        'ROWVERSION', 8, 'ROWID', 10, NULL) AS 
COLUMN_LENGTH,\n"
+                    + "        DECODE(TRUNC(c.scale / 65536), 0, NULL, 
TRUNC(c.scale / 65536)::INTEGER) AS COLUMN_PRECISION,\n"
+                    + "        DECODE(DECODE(c.type_name, 'CHAR',-1, 
c.scale),-1, NULL, MOD(c.scale, 65536)) AS COLUMN_SCALE,\n"
+                    + "        c.comments AS COLUMN_COMMENT,\n"
+                    + "        c.DEF_VAL AS DEFAULT_VALUE,\n"
+                    + "        c.NOT_NULl AS IS_NULLABLE\n"
+                    + "    FROM\n"
+                    + "        dba_columns c\n"
+                    + "    LEFT JOIN dba_tables tab ON\n"
+                    + "        c.db_id = tab.db_id\n"
+                    + "        AND c.table_id = tab.table_id\n"
+                    + "    LEFT JOIN dba_schemas sc ON\n"
+                    + "        tab.schema_id = sc.schema_id\n"
+                    + "        AND tab.db_id = sc.db_id\n"
+                    + "    WHERE\n"
+                    + "        sc.schema_name = '%s'\n"
+                    + "        AND tab.table_name = '%s'\n"
+                    + ") AS dc \n";
+
+    public XuguCatalog(
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String defaultSchema) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema);
+    }
+
+    @Override
+    protected String getListDatabaseSql() {
+        return "SELECT DB_NAME FROM dba_databases";
+    }
+
+    @Override
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        return new XuguCreateTableSqlBuilder(table).build(tablePath);
+    }
+
+    @Override
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format("DROP TABLE %s", 
tablePath.getSchemaAndTableName("\""));
+    }
+
+    @Override
+    protected String getCreateDatabaseSql(String databaseName) {
+        return String.format("CREATE DATABASE \"%s\"", databaseName);
+    }
+
+    @Override
+    protected String getDropDatabaseSql(String databaseName) {
+        return String.format("DROP DATABASE \"%s\"", databaseName);
+    }
+
+    @Override
+    protected String getListTableSql(String databaseName) {
+        return "SELECT user_name ,table_name FROM all_users au \n"
+                + "INNER JOIN all_tables at ON au.user_id=at.user_id AND 
au.db_id=at.db_id";
+    }
+
+    @Override
+    protected String getTableName(ResultSet rs) throws SQLException {
+        if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+            return null;
+        }
+        return rs.getString(1) + "." + rs.getString(2);
+    }
+
+    @Override
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
+        String columnName = resultSet.getString("COLUMN_NAME");
+        String typeName = resultSet.getString("TYPE_NAME");
+        String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
+        long columnLength = resultSet.getLong("COLUMN_LENGTH");
+        Long columnPrecision = resultSet.getObject("COLUMN_PRECISION", 
Long.class);
+        Integer columnScale = resultSet.getObject("COLUMN_SCALE", 
Integer.class);
+        String columnComment = resultSet.getString("COLUMN_COMMENT");
+        Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
+        boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
+
+        BasicTypeDefine typeDefine =
+                BasicTypeDefine.builder()
+                        .name(columnName)
+                        .columnType(fullTypeName)
+                        .dataType(typeName)
+                        .length(columnLength)
+                        .precision(columnPrecision)
+                        .scale(columnScale)
+                        .nullable(isNullable)
+                        .defaultValue(defaultValue)
+                        .comment(columnComment)
+                        .build();
+        return XuguTypeConverter.INSTANCE.convert(typeDefine);
+    }
+
+    @Override
+    protected String getUrlFromDatabaseName(String databaseName) {
+        return defaultUrl;
+    }
+
+    @Override
+    protected String getOptionTableName(TablePath tablePath) {
+        return tablePath.getSchemaAndTableName();
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
+                return databaseExists(tablePath.getDatabaseName())
+                        && listTables(tablePath.getDatabaseName())
+                                .contains(tablePath.getSchemaAndTableName());
+            }
+            return listTables().contains(tablePath.getSchemaAndTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    private List<String> listTables() {
+        List<String> databases = listDatabases();
+        return listTables(databases.get(0));
+    }
+
+    @Override
+    public CatalogTable getTable(String sqlQuery) throws SQLException {
+        Connection defaultConnection = getConnection(defaultUrl);
+        return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new 
XuguTypeMapper());
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) {
+        return String.format(
+                "TRUNCATE TABLE \"%s\".\"%s\"",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
+
+    @Override
+    protected String getExistDataSql(TablePath tablePath) {
+        return String.format(
+                "SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM = 1",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
+
+    @Override
+    protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        try {
+            return getConstraintKeys(
+                    metaData,
+                    tablePath.getDatabaseName(),
+                    tablePath.getSchemaName(),
+                    tablePath.getTableName());
+        } catch (SQLException e) {
+            log.info("Obtain constraint failure", e);
+            return new ArrayList<>();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
new file mode 100644
index 0000000000..ac0f3e24ae
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class XuguCatalogFactory implements CatalogFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return DatabaseIdentifier.XUGU;
+    }
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+        JdbcUrlUtil.UrlInfo urlInfo = OracleURLParser.parse(urlWithDatabase);
+        Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+        if (!defaultDatabase.isPresent()) {
+            throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+        }
+        return new XuguCatalog(
+                catalogName,
+                options.get(JdbcCatalogOptions.USERNAME),
+                options.get(JdbcCatalogOptions.PASSWORD),
+                urlInfo,
+                options.get(JdbcCatalogOptions.SCHEMA));
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return JdbcCatalogOptions.BASE_RULE.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..19bce1a8ca
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class XuguCreateTableSqlBuilder {
+
+    private List<Column> columns;
+    private PrimaryKey primaryKey;
+    private String sourceCatalogName;
+    private String fieldIde;
+
+    public XuguCreateTableSqlBuilder(CatalogTable catalogTable) {
+        this.columns = catalogTable.getTableSchema().getColumns();
+        this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+        this.sourceCatalogName = catalogTable.getCatalogName();
+        this.fieldIde = catalogTable.getOptions().get("fieldIde");
+    }
+
+    public String build(TablePath tablePath) {
+        StringBuilder createTableSql = new StringBuilder();
+        createTableSql
+                .append("CREATE TABLE ")
+                .append(tablePath.getSchemaAndTableName("\""))
+                .append(" (\n");
+
+        List<String> columnSqls =
+                columns.stream()
+                        .map(column -> 
CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
+                        .collect(Collectors.toList());
+
+        // Add primary key directly in the create table statement
+        if (primaryKey != null
+                && primaryKey.getColumnNames() != null
+                && primaryKey.getColumnNames().size() > 0) {
+            columnSqls.add(buildPrimaryKeySql(primaryKey));
+        }
+
+        createTableSql.append(String.join(",\n", columnSqls));
+        createTableSql.append("\n)");
+
+        List<String> commentSqls =
+                columns.stream()
+                        .filter(column -> 
StringUtils.isNotBlank(column.getComment()))
+                        .map(
+                                column ->
+                                        buildColumnCommentSql(
+                                                column, 
tablePath.getSchemaAndTableName("\"")))
+                        .collect(Collectors.toList());
+
+        if (!commentSqls.isEmpty()) {
+            createTableSql.append(";\n");
+            createTableSql.append(String.join(";\n", commentSqls));
+        }
+
+        return createTableSql.toString();
+    }
+
+    private String buildColumnSql(Column column) {
+        StringBuilder columnSql = new StringBuilder();
+        columnSql.append("\"").append(column.getName()).append("\" ");
+
+        String columnType =
+                StringUtils.equalsIgnoreCase(DatabaseIdentifier.XUGU, 
sourceCatalogName)
+                        ? column.getSourceType()
+                        : 
XuguTypeConverter.INSTANCE.reconvert(column).getColumnType();
+        columnSql.append(columnType);
+
+        if (!column.isNullable()) {
+            columnSql.append(" NOT NULL");
+        }
+
+        return columnSql.toString();
+    }
+
+    private String buildPrimaryKeySql(PrimaryKey primaryKey) {
+        String randomSuffix = UUID.randomUUID().toString().replace("-", 
"").substring(0, 4);
+        String columnNamesString =
+                primaryKey.getColumnNames().stream()
+                        .map(columnName -> "\"" + columnName + "\"")
+                        .collect(Collectors.joining(", "));
+
+        // In xugu database, the maximum length for an identifier is 30 
characters.
+        String primaryKeyStr = primaryKey.getPrimaryKey();
+        if (primaryKeyStr.length() > 25) {
+            primaryKeyStr = primaryKeyStr.substring(0, 25);
+        }
+
+        return CatalogUtils.getFieldIde(
+                "CONSTRAINT "
+                        + primaryKeyStr
+                        + "_"
+                        + randomSuffix
+                        + " PRIMARY KEY ("
+                        + columnNamesString
+                        + ")",
+                fieldIde);
+    }
+
+    private String buildColumnCommentSql(Column column, String tableName) {
+        StringBuilder columnCommentSql = new StringBuilder();
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", 
fieldIde))
+                .append(tableName)
+                .append(".");
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "\""))
+                .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
+                .append(column.getComment())
+                .append("'");
+        return columnCommentSql.toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 3b1738afb2..2f6aabc502 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -38,4 +38,5 @@ public class DatabaseIdentifier {
     public static final String VERTICA = "Vertica";
     public static final String OCENABASE = "OceanBase";
     public static final String TIDB = "TiDB";
+    public static final String XUGU = "XUGU";
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
new file mode 100644
index 0000000000..1ef617b393
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class XuguDialect implements JdbcDialect {
+
+    private static final int DEFAULT_XUGU_FETCH_SIZE = 500;
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public XuguDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
+    public XuguDialect() {}
+
+    @Override
+    public String dialectName() {
+        return DatabaseIdentifier.XUGU;
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new XuguJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new XuguTypeMapper();
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        if (identifier.contains(".")) {
+            String[] parts = identifier.split("\\.");
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < parts.length - 1; i++) {
+                sb.append("\"").append(parts[i]).append("\"").append(".");
+            }
+            return sb.append("\"")
+                    .append(getFieldIde(parts[parts.length - 1], fieldIde))
+                    .append("\"")
+                    .toString();
+        }
+
+        return "\"" + getFieldIde(identifier, fieldIde) + "\"";
+    }
+
+    @Override
+    public String tableIdentifier(String database, String tableName) {
+        return quoteIdentifier(tableName);
+    }
+
+    @Override
+    public String extractTableName(TablePath tablePath) {
+        return tablePath.getSchemaAndTableName();
+    }
+
+    @Override
+    public TablePath parse(String tablePath) {
+        return TablePath.of(tablePath, true);
+    }
+
+    @Override
+    public String tableIdentifier(TablePath tablePath) {
+        return tablePath.getSchemaAndTableName();
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(fieldName -> 
!Arrays.asList(uniqueKeyFields).contains(fieldName))
+                        .collect(Collectors.toList());
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(fieldName -> ":" + fieldName + " " + 
quoteIdentifier(fieldName))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s FROM DUAL", 
valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                fieldName ->
+                                        String.format(
+                                                "TARGET.%s=SOURCE.%s",
+                                                quoteIdentifier(fieldName),
+                                                quoteIdentifier(fieldName)))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                fieldName ->
+                                        String.format(
+                                                "TARGET.%s=SOURCE.%s",
+                                                quoteIdentifier(fieldName),
+                                                quoteIdentifier(fieldName)))
+                        .collect(Collectors.joining(", "));
+        String insertFields =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(fieldName -> "SOURCE." + 
quoteIdentifier(fieldName))
+                        .collect(Collectors.joining(", "));
+
+        String upsertSQL =
+                String.format(
+                        " MERGE INTO %s TARGET"
+                                + " USING (%s) SOURCE"
+                                + " ON (%s) "
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s)",
+                        tableIdentifier(database, tableName),
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        insertFields,
+                        insertValues);
+
+        return Optional.of(upsertSQL);
+    }
+
+    @Override
+    public PreparedStatement creatPreparedStatement(
+            Connection connection, String queryTemplate, int fetchSize) throws 
SQLException {
+        PreparedStatement statement =
+                connection.prepareStatement(
+                        queryTemplate, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+        if (fetchSize > 0) {
+            statement.setFetchSize(fetchSize);
+        } else {
+            statement.setFetchSize(DEFAULT_XUGU_FETCH_SIZE);
+        }
+        return statement;
+    }
+
+    @Override
+    public Object queryNextChunkMax(
+            Connection connection,
+            JdbcSourceTable table,
+            String columnName,
+            int chunkSize,
+            Object includedLowerBound)
+            throws SQLException {
+        String quotedColumn = quoteIdentifier(columnName);
+        String sqlQuery;
+        if (StringUtils.isNotBlank(table.getQuery())) {
+            sqlQuery =
+                    String.format(
+                            "SELECT MAX(%s) FROM ("
+                                    + "SELECT %s FROM (%s) WHERE %s >= ? ORDER 
BY %s ASC "
+                                    + ") WHERE ROWNUM <= %s",
+                            quotedColumn,
+                            quotedColumn,
+                            table.getQuery(),
+                            quotedColumn,
+                            quotedColumn,
+                            chunkSize);
+        } else {
+            sqlQuery =
+                    String.format(
+                            "SELECT MAX(%s) FROM ("
+                                    + "SELECT %s FROM %s WHERE %s >= ? ORDER 
BY %s ASC "
+                                    + ") WHERE ROWNUM <= %s",
+                            quotedColumn,
+                            quotedColumn,
+                            table.getTablePath().getSchemaAndTableName(),
+                            quotedColumn,
+                            quotedColumn,
+                            chunkSize);
+        }
+
+        try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
+            ps.setObject(1, includedLowerBound);
+            try (ResultSet rs = ps.executeQuery()) {
+                if (!rs.next()) {
+                    // this should never happen
+                    throw new SQLException(
+                            String.format("No result returned after running 
query [%s]", sqlQuery));
+                }
+                return rs.getObject(1);
+            }
+        }
+    }
+
+    @Override
+    public ResultSetMetaData getResultSetMetaData(Connection conn, String 
query)
+            throws SQLException {
+        PreparedStatement ps = conn.prepareStatement(query);
+        return ps.executeQuery().getMetaData();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
new file mode 100644
index 0000000000..0e489b728b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+/** Factory for {@link XuguDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class XuguDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:xugu:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new XuguDialect();
+    }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+        return new XuguDialect(fieldIde);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguJdbcRowConverter.java
new file mode 100644
index 0000000000..4590761965
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguJdbcRowConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+public class XuguJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return DatabaseIdentifier.XUGU;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
new file mode 100644
index 0000000000..54a8805f3b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+// reference
+// 
https://docs.xugudb.com/%E8%99%9A%E8%B0%B7%E6%95%B0%E6%8D%AE%E5%BA%93%E5%AF%B9%E5%A4%96%E5%8F%91%E5%B8%83/06%E5%8F%82%E8%80%83%E6%8C%87%E5%8D%97/SQL%E8%AF%AD%E6%B3%95%E5%8F%82%E8%80%83/%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B/%E6%A6%82%E8%BF%B0/
+@Slf4j
+@AutoService(TypeConverter.class)
+public class XuguTypeConverter implements TypeConverter<BasicTypeDefine> {
+    // ============================data types=====================
+    // -------------------------number----------------------------
+    public static final String XUGU_NUMERIC = "NUMERIC";
+    public static final String XUGU_NUMBER = "NUMBER";
+    public static final String XUGU_DECIMAL = "DECIMAL";
+    public static final String XUGU_INTEGER = "INTEGER";
+    public static final String XUGU_INT = "INT";
+    public static final String XUGU_BIGINT = "BIGINT";
+    public static final String XUGU_TINYINT = "TINYINT";
+    public static final String XUGU_SMALLINT = "SMALLINT";
+    public static final String XUGU_FLOAT = "FLOAT";
+    public static final String XUGU_DOUBLE = "DOUBLE";
+
+    // ----------------------------string-------------------------
+    public static final String XUGU_CHAR = "CHAR";
+    public static final String XUGU_NCHAR = "NCHAR";
+    public static final String XUGU_VARCHAR = "VARCHAR";
+    public static final String XUGU_VARCHAR2 = "VARCHAR2";
+    public static final String XUGU_CLOB = "CLOB";
+
+    // ------------------------------time-------------------------
+    public static final String XUGU_DATE = "DATE";
+    public static final String XUGU_TIME = "TIME";
+    public static final String XUGU_TIMESTAMP = "TIMESTAMP";
+    public static final String XUGU_DATETIME = "DATETIME";
+    public static final String XUGU_DATETIME_WITH_TIME_ZONE = "DATETIME WITH 
TIME ZONE";
+    public static final String XUGU_TIME_WITH_TIME_ZONE = "TIME WITH TIME 
ZONE";
+    public static final String XUGU_TIMESTAMP_WITH_TIME_ZONE = "TIMESTAMP WITH 
TIME ZONE";
+
+    // ---------------------------binary---------------------------
+    public static final String XUGU_BINARY = "BINARY";
+    public static final String XUGU_BLOB = "BLOB";
+
+    // ---------------------------other---------------------------
+    public static final String XUGU_GUID = "GUID";
+    public static final String XUGU_BOOLEAN = "BOOLEAN";
+    public static final String XUGU_BOOL = "BOOL";
+    public static final String XUGU_JSON = "JSON";
+
+    public static final int MAX_PRECISION = 38;
+    public static final int DEFAULT_PRECISION = MAX_PRECISION;
+    public static final int MAX_SCALE = 38;
+    public static final int DEFAULT_SCALE = 18;
+    public static final int TIMESTAMP_DEFAULT_SCALE = 3;
+    public static final int MAX_TIMESTAMP_SCALE = 6;
+    public static final int MAX_TIME_SCALE = 3;
+    public static final long MAX_VARCHAR_LENGTH = 60000;
+    public static final long POWER_2_16 = (long) Math.pow(2, 16);
+    public static final long BYTES_2GB = (long) Math.pow(2, 31);
+    public static final long MAX_BINARY_LENGTH = POWER_2_16 - 4;
+    public static final XuguTypeConverter INSTANCE = new XuguTypeConverter();
+
+    @Override
+    public String identifier() {
+        return DatabaseIdentifier.XUGU;
+    }
+
+    @Override
+    public Column convert(BasicTypeDefine typeDefine) {
+        PhysicalColumn.PhysicalColumnBuilder builder =
+                PhysicalColumn.builder()
+                        .name(typeDefine.getName())
+                        .sourceType(typeDefine.getColumnType())
+                        .nullable(typeDefine.isNullable())
+                        .defaultValue(typeDefine.getDefaultValue())
+                        .comment(typeDefine.getComment());
+
+        String xuguDataType = typeDefine.getDataType().toUpperCase();
+        switch (xuguDataType) {
+            case XUGU_BOOLEAN:
+            case XUGU_BOOL:
+                builder.dataType(BasicType.BOOLEAN_TYPE);
+                break;
+            case XUGU_TINYINT:
+                builder.dataType(BasicType.BYTE_TYPE);
+                break;
+            case XUGU_SMALLINT:
+                builder.dataType(BasicType.SHORT_TYPE);
+                break;
+            case XUGU_INT:
+            case XUGU_INTEGER:
+                builder.dataType(BasicType.INT_TYPE);
+                break;
+            case XUGU_BIGINT:
+                builder.dataType(BasicType.LONG_TYPE);
+                break;
+            case XUGU_FLOAT:
+                builder.dataType(BasicType.FLOAT_TYPE);
+                break;
+            case XUGU_DOUBLE:
+                builder.dataType(BasicType.DOUBLE_TYPE);
+                break;
+            case XUGU_NUMBER:
+            case XUGU_DECIMAL:
+            case XUGU_NUMERIC:
+                DecimalType decimalType;
+                if (typeDefine.getPrecision() != null && 
typeDefine.getPrecision() > 0) {
+                    decimalType =
+                            new DecimalType(
+                                    typeDefine.getPrecision().intValue(), 
typeDefine.getScale());
+                } else {
+                    decimalType = new DecimalType(DEFAULT_PRECISION, 
DEFAULT_SCALE);
+                }
+                builder.dataType(decimalType);
+                builder.columnLength(Long.valueOf(decimalType.getPrecision()));
+                builder.scale(decimalType.getScale());
+                break;
+
+            case XUGU_CHAR:
+            case XUGU_NCHAR:
+                builder.dataType(BasicType.STRING_TYPE);
+                if (typeDefine.getLength() == null || typeDefine.getLength() 
<= 0) {
+                    
builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L));
+                } else {
+                    builder.columnLength(typeDefine.getLength());
+                }
+                break;
+            case XUGU_VARCHAR:
+            case XUGU_VARCHAR2:
+                builder.dataType(BasicType.STRING_TYPE);
+                if (typeDefine.getLength() == null || typeDefine.getLength() 
<= 0) {
+                    
builder.columnLength(TypeDefineUtils.charTo4ByteLength(MAX_VARCHAR_LENGTH));
+                } else {
+                    builder.columnLength(typeDefine.getLength());
+                }
+                break;
+            case XUGU_CLOB:
+                builder.dataType(BasicType.STRING_TYPE);
+                builder.columnLength(BYTES_2GB - 1);
+                break;
+            case XUGU_JSON:
+            case XUGU_GUID:
+                builder.dataType(BasicType.STRING_TYPE);
+                break;
+            case XUGU_BINARY:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                builder.columnLength(MAX_BINARY_LENGTH);
+                break;
+            case XUGU_BLOB:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                builder.columnLength(BYTES_2GB - 1);
+                break;
+            case XUGU_DATE:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
+                break;
+            case XUGU_TIME:
+            case XUGU_TIME_WITH_TIME_ZONE:
+                builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
+                break;
+            case XUGU_DATETIME:
+            case XUGU_DATETIME_WITH_TIME_ZONE:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                break;
+            case XUGU_TIMESTAMP:
+            case XUGU_TIMESTAMP_WITH_TIME_ZONE:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                if (typeDefine.getScale() == null) {
+                    builder.scale(TIMESTAMP_DEFAULT_SCALE);
+                } else {
+                    builder.scale(typeDefine.getScale());
+                }
+                break;
+            default:
+                throw CommonError.convertToSeaTunnelTypeError(
+                        DatabaseIdentifier.XUGU, xuguDataType, 
typeDefine.getName());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public BasicTypeDefine reconvert(Column column) {
+        BasicTypeDefine.BasicTypeDefineBuilder builder =
+                BasicTypeDefine.builder()
+                        .name(column.getName())
+                        .nullable(column.isNullable())
+                        .comment(column.getComment())
+                        .defaultValue(column.getDefaultValue());
+        switch (column.getDataType().getSqlType()) {
+            case BOOLEAN:
+                builder.columnType(XUGU_BOOLEAN);
+                builder.dataType(XUGU_BOOLEAN);
+                break;
+            case TINYINT:
+                builder.columnType(XUGU_TINYINT);
+                builder.dataType(XUGU_TINYINT);
+                break;
+            case SMALLINT:
+                builder.columnType(XUGU_SMALLINT);
+                builder.dataType(XUGU_SMALLINT);
+                break;
+            case INT:
+                builder.columnType(XUGU_INTEGER);
+                builder.dataType(XUGU_INTEGER);
+                break;
+            case BIGINT:
+                builder.columnType(XUGU_BIGINT);
+                builder.dataType(XUGU_BIGINT);
+                break;
+            case FLOAT:
+                builder.columnType(XUGU_FLOAT);
+                builder.dataType(XUGU_FLOAT);
+                break;
+            case DOUBLE:
+                builder.columnType(XUGU_DOUBLE);
+                builder.dataType(XUGU_DOUBLE);
+                break;
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) column.getDataType();
+                long precision = decimalType.getPrecision();
+                int scale = decimalType.getScale();
+                if (precision <= 0) {
+                    precision = DEFAULT_PRECISION;
+                    scale = DEFAULT_SCALE;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which is precision less than 0, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            precision,
+                            scale);
+                } else if (precision > MAX_PRECISION) {
+                    scale = (int) Math.max(0, scale - (precision - 
MAX_PRECISION));
+                    precision = MAX_PRECISION;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which exceeds the maximum precision of 
{}, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            MAX_PRECISION,
+                            precision,
+                            scale);
+                }
+                if (scale < 0) {
+                    scale = 0;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which is scale less than 0, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            precision,
+                            scale);
+                } else if (scale > MAX_SCALE) {
+                    scale = MAX_SCALE;
+                    log.warn(
+                            "The decimal column {} type decimal({},{}) is out 
of range, "
+                                    + "which exceeds the maximum scale of {}, "
+                                    + "it will be converted to decimal({},{})",
+                            column.getName(),
+                            decimalType.getPrecision(),
+                            decimalType.getScale(),
+                            MAX_SCALE,
+                            precision,
+                            scale);
+                }
+                builder.columnType(String.format("%s(%s,%s)", XUGU_NUMERIC, 
precision, scale));
+                builder.dataType(XUGU_NUMERIC);
+                builder.precision(precision);
+                builder.scale(scale);
+                break;
+            case BYTES:
+                if (column.getColumnLength() == null || 
column.getColumnLength() <= 0) {
+                    builder.columnType(XUGU_BLOB);
+                    builder.dataType(XUGU_BLOB);
+                } else if (column.getColumnLength() <= MAX_BINARY_LENGTH) {
+                    builder.columnType(XUGU_BINARY);
+                    builder.dataType(XUGU_BINARY);
+                } else {
+                    builder.columnType(XUGU_BLOB);
+                    builder.dataType(XUGU_BLOB);
+                }
+                break;
+            case STRING:
+                if (column.getColumnLength() == null || 
column.getColumnLength() <= 0) {
+                    builder.columnType(String.format("%s(%s)", XUGU_VARCHAR, 
MAX_VARCHAR_LENGTH));
+                    builder.dataType(XUGU_VARCHAR);
+                } else if (column.getColumnLength() <= MAX_VARCHAR_LENGTH) {
+                    builder.columnType(
+                            String.format("%s(%s)", XUGU_VARCHAR, 
column.getColumnLength()));
+                    builder.dataType(XUGU_VARCHAR);
+                } else {
+                    builder.columnType(XUGU_CLOB);
+                    builder.dataType(XUGU_CLOB);
+                }
+                break;
+            case DATE:
+                builder.columnType(XUGU_DATE);
+                builder.dataType(XUGU_DATE);
+                break;
+            case TIME:
+                builder.dataType(XUGU_TIME);
+                if (column.getScale() != null && column.getScale() > 0) {
+                    Integer timeScale = column.getScale();
+                    if (timeScale > MAX_TIME_SCALE) {
+                        timeScale = MAX_TIME_SCALE;
+                        log.warn(
+                                "The time column {} type time({}) is out of 
range, "
+                                        + "which exceeds the maximum scale of 
{}, "
+                                        + "it will be converted to time({})",
+                                column.getName(),
+                                column.getScale(),
+                                MAX_SCALE,
+                                timeScale);
+                    }
+                    builder.columnType(String.format("%s(%s)", XUGU_TIME, 
timeScale));
+                    builder.scale(timeScale);
+                } else {
+                    builder.columnType(XUGU_TIME);
+                }
+                break;
+            case TIMESTAMP:
+                if (column.getScale() == null || column.getScale() <= 0) {
+                    builder.columnType(XUGU_TIMESTAMP);
+                } else {
+                    int timestampScale = column.getScale();
+                    if (column.getScale() > MAX_TIMESTAMP_SCALE) {
+                        timestampScale = MAX_TIMESTAMP_SCALE;
+                        log.warn(
+                                "The timestamp column {} type timestamp({}) is 
out of range, "
+                                        + "which exceeds the maximum scale of 
{}, "
+                                        + "it will be converted to 
timestamp({})",
+                                column.getName(),
+                                column.getScale(),
+                                MAX_TIMESTAMP_SCALE,
+                                timestampScale);
+                    }
+                    builder.columnType(String.format("TIMESTAMP(%s)", 
timestampScale));
+                    builder.scale(timestampScale);
+                }
+                builder.dataType(XUGU_TIMESTAMP);
+                break;
+            default:
+                throw CommonError.convertToConnectorTypeError(
+                        DatabaseIdentifier.XUGU,
+                        column.getDataType().getSqlType().name(),
+                        column.getName());
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeMapper.java
new file mode 100644
index 0000000000..e517f56af0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeMapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+@Slf4j
+public class XuguTypeMapper implements JdbcDialectTypeMapper {
+
+    @Override
+    public Column mappingColumn(BasicTypeDefine typeDefine) {
+        return XuguTypeConverter.INSTANCE.convert(typeDefine);
+    }
+
+    @Override
+    public Column mappingColumn(ResultSetMetaData metadata, int colIndex) 
throws SQLException {
+        String columnName = metadata.getColumnLabel(colIndex);
+        String nativeType = metadata.getColumnTypeName(colIndex);
+        int isNullable = metadata.isNullable(colIndex);
+        long precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        if (Arrays.asList("CHAR", "NCHAR", "VARCHAR", 
"VARCHAR2").contains(nativeType)) {
+            long octetByteLength = 
TypeDefineUtils.charTo4ByteLength(precision);
+            precision = octetByteLength;
+        }
+
+        BasicTypeDefine typeDefine =
+                BasicTypeDefine.builder()
+                        .name(columnName)
+                        .columnType(nativeType)
+                        .dataType(nativeType)
+                        .nullable(isNullable == 
ResultSetMetaData.columnNullable)
+                        .length(precision)
+                        .precision(precision)
+                        .scale(scale)
+                        .build();
+        return mappingColumn(typeDefine);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverterTest.java
new file mode 100644
index 0000000000..9dfd7079df
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverterTest.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter.BYTES_2GB;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter.MAX_BINARY_LENGTH;
+
+public class XuguTypeConverterTest {
+    @Test
+    public void testConvertUnsupported() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("aaa").dataType("aaa").build();
+        try {
+            XuguTypeConverter.INSTANCE.convert(typeDefine);
+            Assertions.fail();
+        } catch (SeaTunnelRuntimeException e) {
+            // ignore
+        } catch (Throwable e) {
+            Assertions.fail();
+        }
+    }
+
+    @Test
+    public void testReconvertUnsupported() {
+        Column column =
+                PhysicalColumn.of(
+                        "test",
+                        new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                        (Long) null,
+                        true,
+                        null,
+                        null);
+        try {
+            XuguTypeConverter.INSTANCE.reconvert(column);
+            Assertions.fail();
+        } catch (SeaTunnelRuntimeException e) {
+            // ignore
+        } catch (Throwable e) {
+            Assertions.fail();
+        }
+    }
+
+    @Test
+    public void testConvertBoolean() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("bool")
+                        .dataType("boolean")
+                        .nullable(true)
+                        .defaultValue("1")
+                        .comment("test")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+        Assertions.assertEquals(typeDefine.isNullable(), column.isNullable());
+        Assertions.assertEquals(typeDefine.getDefaultValue(), 
column.getDefaultValue());
+        Assertions.assertEquals(typeDefine.getComment(), column.getComment());
+    }
+
+    @Test
+    public void testConvertTinyint() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("tinyint")
+                        .dataType("tinyint")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.BYTE_TYPE, column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertSmallint() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("smallint")
+                        .dataType("smallint")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertInt() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("int").dataType("int").build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertBigint() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("bigint")
+                        .dataType("bigint")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertFloat() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("float")
+                        .dataType("float")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertDouble() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("double")
+                        .dataType("double")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertDecimal() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("numeric(38,2)")
+                        .dataType("numeric")
+                        .precision(38L)
+                        .scale(2)
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 2), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("numeric")
+                        .dataType("numeric")
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertChar() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("char").dataType("char").build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(4, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("char(10)")
+                        .dataType("char")
+                        .length(10L)
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(10, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertVarchar() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("varchar")
+                        .dataType("varchar")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(240000, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("varchar(10)")
+                        .dataType("varchar")
+                        .length(10L)
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(10, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("varchar2(20)")
+                        .dataType("varchar2")
+                        .length(20L)
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(20, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertOtherString() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("clob").dataType("clob").build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(BYTES_2GB - 1, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("json").dataType("json").build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(null, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertBinary() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("blob").dataType("blob").build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
+        Assertions.assertEquals(BYTES_2GB - 1, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertDate() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("date").dataType("date").build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertTime() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("time").dataType("time").build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("time with time zone")
+                        .dataType("time with time zone")
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testConvertTimestamp() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("datetime")
+                        .dataType("datetime")
+                        .build();
+        Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("datetime with time zone")
+                        .dataType("datetime with time zone")
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("timestamp")
+                        .dataType("timestamp")
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(3, column.getScale());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("timestamp(6)")
+                        .dataType("timestamp")
+                        .scale(6)
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("timestamp with time zone")
+                        .dataType("timestamp with time zone")
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(3, column.getScale());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("timestamp(3) with time zone")
+                        .dataType("timestamp with time zone")
+                        .scale(3)
+                        .build();
+        column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
+    @Test
+    public void testReconvertBoolean() {
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(BasicType.BOOLEAN_TYPE)
+                        .nullable(true)
+                        .defaultValue(true)
+                        .comment("test")
+                        .build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BOOLEAN, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BOOLEAN, 
typeDefine.getDataType());
+        Assertions.assertEquals(column.isNullable(), typeDefine.isNullable());
+        Assertions.assertEquals(column.getDefaultValue(), 
typeDefine.getDefaultValue());
+        Assertions.assertEquals(column.getComment(), typeDefine.getComment());
+    }
+
+    @Test
+    public void testReconvertByte() {
+        Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.BYTE_TYPE).build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_TINYINT, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_TINYINT, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertShort() {
+        Column column =
+                
PhysicalColumn.builder().name("test").dataType(BasicType.SHORT_TYPE).build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_SMALLINT, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_SMALLINT, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertInt() {
+        Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.INT_TYPE).build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_INTEGER, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_INTEGER, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertLong() {
+        Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.LONG_TYPE).build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BIGINT, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BIGINT, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertFloat() {
+        Column column =
+                
PhysicalColumn.builder().name("test").dataType(BasicType.FLOAT_TYPE).build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_FLOAT, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_FLOAT, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertDouble() {
+        Column column =
+                
PhysicalColumn.builder().name("test").dataType(BasicType.DOUBLE_TYPE).build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_DOUBLE, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_DOUBLE, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertDecimal() {
+        Column column =
+                PhysicalColumn.builder().name("test").dataType(new 
DecimalType(0, 0)).build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format(
+                        "%s(%s,%s)",
+                        XuguTypeConverter.XUGU_NUMERIC,
+                        XuguTypeConverter.DEFAULT_PRECISION,
+                        XuguTypeConverter.DEFAULT_SCALE),
+                typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_NUMERIC, 
typeDefine.getDataType());
+
+        column = PhysicalColumn.builder().name("test").dataType(new 
DecimalType(10, 2)).build();
+
+        typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format("%s(%s,%s)", XuguTypeConverter.XUGU_NUMERIC, 10, 
2),
+                typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_NUMERIC, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertBytes() {
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(PrimitiveByteArrayType.INSTANCE)
+                        .columnLength(null)
+                        .build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BLOB, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BLOB, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(PrimitiveByteArrayType.INSTANCE)
+                        .columnLength(MAX_BINARY_LENGTH)
+                        .build();
+
+        typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BINARY, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_BINARY, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertString() {
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(BasicType.STRING_TYPE)
+                        .columnLength(null)
+                        .build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals("VARCHAR(60000)", typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_VARCHAR, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(BasicType.STRING_TYPE)
+                        .columnLength(1L)
+                        .build();
+
+        typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format("%s(%s)", XuguTypeConverter.XUGU_VARCHAR, 
column.getColumnLength()),
+                typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_VARCHAR, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(BasicType.STRING_TYPE)
+                        .columnLength(60000L)
+                        .build();
+
+        typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format("%s(%s)", XuguTypeConverter.XUGU_VARCHAR, 
column.getColumnLength()),
+                typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_VARCHAR, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(BasicType.STRING_TYPE)
+                        .columnLength(60001L)
+                        .build();
+
+        typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_CLOB, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_CLOB, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertDate() {
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_DATE_TYPE)
+                        .build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_DATE, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_DATE, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertTime() {
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_TIME_TYPE)
+                        .build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_TIME, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_TIME, 
typeDefine.getDataType());
+    }
+
+    @Test
+    public void testReconvertDatetime() {
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+                        .build();
+
+        BasicTypeDefine typeDefine = 
XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_TIMESTAMP, 
typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_TIMESTAMP, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+                        .scale(3)
+                        .build();
+
+        typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format("%s(%s)", XuguTypeConverter.XUGU_TIMESTAMP, 
column.getScale()),
+                typeDefine.getColumnType());
+        Assertions.assertEquals(XuguTypeConverter.XUGU_TIMESTAMP, 
typeDefine.getDataType());
+        Assertions.assertEquals(column.getScale(), typeDefine.getScale());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+                        .scale(6)
+                        .build();
+
+        typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format("%s(%s)", XuguTypeConverter.XUGU_TIMESTAMP, 6),
+                typeDefine.getColumnType());
+    }
+}
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml 
b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 5c1171a82d..1da0d3014b 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -203,6 +203,7 @@
                 <include>com.teradata.jdbc:terajdbc4:jar</include>
                 <include>com.amazon.redshift:redshift-jdbc42:jar</include>
                 <include>net.snowflake.snowflake-jdbc:jar</include>
+                <include>com.xugudb:xugu-jdbc:jar</include>
             </includes>
             
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
             <outputDirectory>/lib</outputDirectory>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
index 09d511594f..af4c61d5b6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
@@ -91,6 +91,11 @@
             <artifactId>vertica-jdbc</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.xugudb</groupId>
+            <artifactId>xugu-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
new file mode 100644
index 0000000000..5fdae0ad93
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu.XuguCatalog;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class JdbcXuguIT extends AbstractJdbcIT {
+
+    private static final String XUGU_IMAGE = "xugudb/xugudb:v12";
+    private static final String XUGU_CONTAINER_HOST = "e2e_xugudb";
+    private static final String XUGU_SCHEMA = "SYSDBA";
+    private static final String XUGU_DATABASE = "SYSTEM";
+    private static final String XUGU_SOURCE = "e2e_table_source";
+    private static final String XUGU_SINK = "e2e_table_sink";
+    private static final String CATALOG_DATABASE = "catalog_database";
+    private static final String CATALOG_TABLE = "e2e_table_catalog";
+    private static final String XUGU_USERNAME = "SYSDBA";
+    private static final String XUGU_PASSWORD = "SYSDBA";
+    private static final int XUGU_PORT = 5138;
+    private static final String XUGU_URL = "jdbc:xugu://" + HOST + ":%s/%s";
+
+    private static final String DRIVER_CLASS = "com.xugu.cloudjdbc.Driver";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList(
+                    "/jdbc_xugu_source_and_upsert_sink.conf", 
"/jdbc_xugu_source_and_sink.conf");
+    private static final String CREATE_SQL =
+            "create table if not exists %s"
+                    + "(\n"
+                    + "    XUGU_NUMERIC                   NUMERIC(10,2),\n"
+                    + "    XUGU_NUMBER                    NUMBER(10,2),\n"
+                    + "    XUGU_INTEGER                   INTEGER,\n"
+                    + "    XUGU_INT                       INT,\n"
+                    + "    XUGU_BIGINT                    BIGINT,\n"
+                    + "    XUGU_TINYINT                   TINYINT,\n"
+                    + "    XUGU_SMALLINT                  SMALLINT,\n"
+                    + "    XUGU_FLOAT                     FLOAT,\n"
+                    + "    XUGU_DOUBLE                    DOUBLE,\n"
+                    + "    XUGU_CHAR                      CHAR,\n"
+                    + "    XUGU_NCHAR                     NCHAR,\n"
+                    + "    XUGU_VARCHAR                   VARCHAR,\n"
+                    + "    XUGU_VARCHAR2                  VARCHAR2,\n"
+                    + "    XUGU_CLOB                      CLOB,\n"
+                    + "    XUGU_DATE                      DATE,\n"
+                    + "    XUGU_TIME                      TIME,\n"
+                    + "    XUGU_TIMESTAMP                 TIMESTAMP,\n"
+                    + "    XUGU_DATETIME                  DATETIME,\n"
+                    + "    XUGU_TIME_WITH_TIME_ZONE       TIME WITH TIME 
ZONE,\n"
+                    + "    XUGU_TIMESTAMP_WITH_TIME_ZONE  TIMESTAMP WITH TIME 
ZONE,\n"
+                    + "    XUGU_BINARY                    BINARY,\n"
+                    + "    XUGU_BLOB                      BLOB,\n"
+                    + "    XUGU_GUID                      GUID,\n"
+                    + "    XUGU_BOOLEAN                   BOOLEAN,\n"
+                    + "    CONSTRAINT \"XUGU_PK\" PRIMARY KEY(XUGU_INT)"
+                    + ")";
+    private static final String[] fieldNames =
+            new String[] {
+                "XUGU_NUMERIC",
+                "XUGU_NUMBER",
+                "XUGU_INTEGER",
+                "XUGU_INT",
+                "XUGU_BIGINT",
+                "XUGU_TINYINT",
+                "XUGU_SMALLINT",
+                "XUGU_FLOAT",
+                "XUGU_DOUBLE",
+                "XUGU_CHAR",
+                "XUGU_NCHAR",
+                "XUGU_VARCHAR",
+                "XUGU_VARCHAR2",
+                "XUGU_CLOB",
+                "XUGU_DATE",
+                "XUGU_TIME",
+                "XUGU_TIMESTAMP",
+                "XUGU_DATETIME",
+                "XUGU_TIME_WITH_TIME_ZONE",
+                "XUGU_TIMESTAMP_WITH_TIME_ZONE",
+                "XUGU_BINARY",
+                "XUGU_BLOB",
+                "XUGU_GUID",
+                "XUGU_BOOLEAN"
+            };
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(XUGU_URL, XUGU_PORT, XUGU_DATABASE);
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(XUGU_SCHEMA, XUGU_SOURCE, fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(XUGU_IMAGE)
+                .networkAliases(XUGU_CONTAINER_HOST)
+                .containerEnv(containerEnv)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(XUGU_PORT)
+                .localPort(XUGU_PORT)
+                .jdbcTemplate(XUGU_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(XUGU_USERNAME)
+                .password(XUGU_PASSWORD)
+                .schema(XUGU_SCHEMA)
+                .database(XUGU_DATABASE)
+                .sourceTable(XUGU_SOURCE)
+                .sinkTable(XUGU_SINK)
+                .catalogDatabase(CATALOG_DATABASE)
+                .catalogSchema(XUGU_SCHEMA)
+                .catalogTable(CATALOG_TABLE)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .build();
+    }
+
+    @Override
+    void compareResult(String executeKey) {
+        defaultCompare(executeKey, fieldNames, "XUGU_INT");
+    }
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar";;
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                BigDecimal.valueOf(1.12),
+                                BigDecimal.valueOf(i, 2),
+                                i,
+                                i,
+                                Long.parseLong("1"),
+                                i,
+                                i,
+                                Float.parseFloat("1.1"),
+                                Double.parseDouble("1.1"),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                Date.valueOf(LocalDate.now()),
+                                Time.valueOf(LocalTime.now()),
+                                new Timestamp(System.currentTimeMillis()),
+                                Timestamp.valueOf(LocalDateTime.now()),
+                                Time.valueOf(LocalTime.now()),
+                                new Timestamp(System.currentTimeMillis()),
+                                null,
+                                null,
+                                null,
+                                false
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    @Override
+    protected GenericContainer<?> initContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(XUGU_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(XUGU_CONTAINER_HOST)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(XUGU_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
XUGU_PORT, XUGU_PORT)));
+
+        return container;
+    }
+
+    @Override
+    public String quoteIdentifier(String field) {
+        return "\"" + field + "\"";
+    }
+
+    @Override
+    protected void clearTable(String database, String schema, String table) {
+        clearTable(schema, table);
+    }
+
+    @Override
+    protected String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(schema, table);
+    }
+
+    @Override
+    protected void initCatalog() {
+        String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost());
+        catalog =
+                new XuguCatalog(
+                        "xugu",
+                        jdbcCase.getUserName(),
+                        jdbcCase.getPassword(),
+                        JdbcUrlUtil.getUrlInfo(jdbcUrl),
+                        XUGU_SCHEMA);
+        catalog.open();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_sink.conf
new file mode 100644
index 0000000000..09154809f3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_sink.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM?batch_mode=false"
+    driver = "com.xugu.cloudjdbc.Driver"
+    connection_check_timeout_sec = 100
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = "select * from e2e_table_source;"
+  }
+}
+
+transform {
+}
+
+sink {
+  jdbc {
+    url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM?batch_mode=false"
+    driver = "com.xugu.cloudjdbc.Driver"
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = """INSERT INTO SYSDBA.e2e_table_sink
+             (XUGU_NUMERIC, XUGU_NUMBER, XUGU_INTEGER, XUGU_INT, XUGU_BIGINT, 
XUGU_TINYINT, XUGU_SMALLINT, XUGU_FLOAT, XUGU_DOUBLE, XUGU_CHAR, XUGU_NCHAR, 
XUGU_VARCHAR, XUGU_VARCHAR2, XUGU_CLOB, XUGU_DATE, XUGU_TIME, XUGU_TIMESTAMP, 
XUGU_DATETIME, XUGU_TIME_WITH_TIME_ZONE, XUGU_TIMESTAMP_WITH_TIME_ZONE, 
XUGU_BINARY, XUGU_BLOB, XUGU_GUID, XUGU_BOOLEAN)
+             VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?);"""
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_upsert_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_upsert_sink.conf
new file mode 100644
index 0000000000..669118f166
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_upsert_sink.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM"
+    driver = "com.xugu.cloudjdbc.Driver"
+    connection_check_timeout_sec = 100
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = "select * from e2e_table_source;"
+  }
+}
+
+transform {
+}
+
+sink {
+  jdbc {
+    url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM?batch_mode=false"
+    driver = "com.xugu.cloudjdbc.Driver"
+    user = "SYSDBA"
+    password = "SYSDBA"
+    generate_sink_sql = true
+    primary_keys = ["XUGU_INT"]
+    table = "SYSDBA.e2e_table_sink"
+    database = "SYSTEM"
+  }
+}

Reply via email to