This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 80f392afbb [feature][connector-v2] add xugudb connector (#6561)
80f392afbb is described below
commit 80f392afbb654145ea3188ba51b1f4bafcd2ef5b
Author: L-Gryps <[email protected]>
AuthorDate: Tue Apr 2 16:42:52 2024 +0800
[feature][connector-v2] add xugudb connector (#6561)
---
.github/workflows/backend.yml | 2 +-
docs/en/connector-v2/sink/Jdbc.md | 1 +
docs/en/connector-v2/source/Jdbc.md | 1 +
seatunnel-connectors-v2/connector-jdbc/pom.xml | 7 +
.../seatunnel/jdbc/catalog/xugu/XuguCatalog.java | 266 +++++++++
.../jdbc/catalog/xugu/XuguCatalogFactory.java | 63 ++
.../catalog/xugu/XuguCreateTableSqlBuilder.java | 141 +++++
.../jdbc/internal/dialect/DatabaseIdentifier.java | 1 +
.../jdbc/internal/dialect/xugu/XuguDialect.java | 231 ++++++++
.../internal/dialect/xugu/XuguDialectFactory.java | 45 ++
.../dialect/xugu/XuguJdbcRowConverter.java | 29 +
.../internal/dialect/xugu/XuguTypeConverter.java | 385 ++++++++++++
.../jdbc/internal/dialect/xugu/XuguTypeMapper.java | 63 ++
.../dialect/xugu/XuguTypeConverterTest.java | 660 +++++++++++++++++++++
.../src/main/assembly/assembly-bin-ci.xml | 1 +
.../connector-jdbc-e2e-part-7/pom.xml | 5 +
.../connectors/seatunnel/jdbc/JdbcXuguIT.java | 246 ++++++++
.../test/resources/jdbc_xugu_source_and_sink.conf | 47 ++
.../jdbc_xugu_source_and_upsert_sink.conf | 48 ++
19 files changed, 2241 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 519cf8533d..9975d477da 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -997,7 +997,7 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- - name: run jdbc connectors integration test (part-6)
+ - name: run jdbc connectors integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-jdbc-e2e-part-7 -am -Pci
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index f0b74414a4..c2591761ec 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -235,6 +235,7 @@ there are some reference value for params above.
| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
|
+| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 | /
|
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
## Example
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 09c3ab636d..225576001d 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -227,6 +227,7 @@ there are some reference value for params above.
| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test |
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 |
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
|
| Hive | org.apache.hive.jdbc.HiveDriver |
jdbc:hive2://localhost:10000 |
https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar
|
+| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 |
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
## Example
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 5880036c90..db8c95dd0f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -50,6 +50,7 @@
<kingbase8.version>8.6.0</kingbase8.version>
<hive.jdbc.version>3.1.3</hive.jdbc.version>
<oceanbase.jdbc.version>2.4.3</oceanbase.jdbc.version>
+ <xugu.jdbc.version>12.2.0</xugu.jdbc.version>
</properties>
<dependencyManagement>
@@ -188,6 +189,12 @@
<version>${oceanbase.jdbc.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.xugudb</groupId>
+ <artifactId>xugu-jdbc</artifactId>
+ <version>${xugu.jdbc.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
new file mode 100644
index 0000000000..462e109c76
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class XuguCatalog extends AbstractJdbcCatalog {
+
+ protected static List<String> EXCLUDED_SCHEMAS =
+ Collections.unmodifiableList(Arrays.asList("GUEST", "SYSAUDITOR",
"SYSSSO"));
+
+ private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+ "SELECT\n"
+ + " dc.COLUMN_NAME,\n"
+ + " CASE\n"
+ + " WHEN dc.TYPE_NAME LIKE 'INTERVAL%%' THEN
'INTERVAL' ELSE REGEXP_SUBSTR(dc.TYPE_NAME, '^[^(]+')\n"
+ + " END AS TYPE_NAME,\n"
+ + " dc.TYPE_NAME ||\n"
+ + " CASE\n"
+ + " WHEN dc.TYPE_NAME IN ('VARCHAR', 'CHAR') THEN
'(' || dc.COLUMN_LENGTH || ')'\n"
+ + " WHEN dc.TYPE_NAME IN ('NUMERIC') AND
dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NOT NULL THEN '(' ||
dc.COLUMN_PRECISION || ', ' || dc.COLUMN_SCALE || ')'\n"
+ + " WHEN dc.TYPE_NAME IN ('NUMERIC') AND
dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NULL THEN '(' ||
dc.COLUMN_PRECISION || ')'\n"
+ + " WHEN dc.TYPE_NAME IN ('TIMESTAMP') THEN '(' ||
dc.COLUMN_SCALE || ')'\n"
+ + " END AS FULL_TYPE_NAME,\n"
+ + " dc.COLUMN_LENGTH,\n"
+ + " dc.COLUMN_PRECISION,\n"
+ + " dc.COLUMN_SCALE,\n"
+ + " dc.COLUMN_COMMENT,\n"
+ + " dc.DEFAULT_VALUE,\n"
+ + " CASE\n"
+ + " dc.IS_NULLABLE WHEN TRUE THEN 'NO' ELSE 'YES'\n"
+ + " END AS IS_NULLABLE\n"
+ + "FROM\n"
+ + " (\n"
+ + " SELECT\n"
+ + " c.col_name AS COLUMN_NAME,\n"
+ + " CASE\n"
+ + " WHEN c.type_name = 'CHAR' AND c.\"VARYING\"
= TRUE THEN 'VARCHAR'\n"
+ + " WHEN c.type_name = 'DATETIME' AND
c.TIMESTAMP_T = 'i' THEN 'TIMESTAMP' ELSE c.type_name\n"
+ + " END AS TYPE_NAME,\n"
+ + " DECODE(c.type_name,\n"
+ + " 'TINYINT', 1, 'SMALLINT', 2,\n"
+ + " 'INTEGER', 4, 'BIGINT', 8,\n"
+ + " 'FLOAT', 4, 'DOUBLE', 8,\n"
+ + " 'NUMERIC', 17,\n"
+ + " 'CHAR', DECODE(c.scale, -1, 60000, c.scale),\n"
+ + " 'DATE', 4, 'DATETIME', 8,\n"
+ + " 'TIMESTAMP', 8, 'DATETIME WITH TIME ZONE', 8,\n"
+ + " 'TIME', 4, 'TIME WITH TIME ZONE', 4,\n"
+ + " 'INTERVAL YEAR', 4, 'INTERVAL MONTH', 4,\n"
+ + " 'INTERVAL DAY', 4, 'INTERVAL HOUR', 4,\n"
+ + " 'INTERVAL MINUTE', 4, 'INTERVAL SECOND', 8,\n"
+ + " 'INTERVAL YEAR TO MONTH', 4,\n"
+ + " 'INTERVAL DAY TO HOUR', 4,\n"
+ + " 'INTERVAL DAY TO MINUTE', 4,\n"
+ + " 'INTERVAL DAY TO SECOND', 8,\n"
+ + " 'INTERVAL HOUR TO MINUTE', 4,\n"
+ + " 'INTERVAL HOUR TO SECOND', 8,\n"
+ + " 'INTERVAL MINUTE TO SECOND', 8,\n"
+ + " 'CLOB', 2147483648,\n"
+ + " 'BLOB', 2147483648, 'BINARY', 2147483648,\n"
+ + " 'GUID', 2, 'BOOLEAN', 1,\n"
+ + " 'ROWVERSION', 8, 'ROWID', 10, NULL) AS
COLUMN_LENGTH,\n"
+ + " DECODE(TRUNC(c.scale / 65536), 0, NULL,
TRUNC(c.scale / 65536)::INTEGER) AS COLUMN_PRECISION,\n"
+ + " DECODE(DECODE(c.type_name, 'CHAR',-1,
c.scale),-1, NULL, MOD(c.scale, 65536)) AS COLUMN_SCALE,\n"
+ + " c.comments AS COLUMN_COMMENT,\n"
+ + " c.DEF_VAL AS DEFAULT_VALUE,\n"
+ + " c.NOT_NULl AS IS_NULLABLE\n"
+ + " FROM\n"
+ + " dba_columns c\n"
+ + " LEFT JOIN dba_tables tab ON\n"
+ + " c.db_id = tab.db_id\n"
+ + " AND c.table_id = tab.table_id\n"
+ + " LEFT JOIN dba_schemas sc ON\n"
+ + " tab.schema_id = sc.schema_id\n"
+ + " AND tab.db_id = sc.db_id\n"
+ + " WHERE\n"
+ + " sc.schema_name = '%s'\n"
+ + " AND tab.table_name = '%s'\n"
+ + ") AS dc \n";
+
+ public XuguCatalog(
+ String catalogName,
+ String username,
+ String pwd,
+ JdbcUrlUtil.UrlInfo urlInfo,
+ String defaultSchema) {
+ super(catalogName, username, pwd, urlInfo, defaultSchema);
+ }
+
+ @Override
+ protected String getListDatabaseSql() {
+ return "SELECT DB_NAME FROM dba_databases";
+ }
+
+ @Override
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ return new XuguCreateTableSqlBuilder(table).build(tablePath);
+ }
+
+ @Override
+ protected String getDropTableSql(TablePath tablePath) {
+ return String.format("DROP TABLE %s",
tablePath.getSchemaAndTableName("\""));
+ }
+
+ @Override
+ protected String getCreateDatabaseSql(String databaseName) {
+ return String.format("CREATE DATABASE \"%s\"", databaseName);
+ }
+
+ @Override
+ protected String getDropDatabaseSql(String databaseName) {
+ return String.format("DROP DATABASE \"%s\"", databaseName);
+ }
+
+ @Override
+ protected String getListTableSql(String databaseName) {
+ return "SELECT user_name ,table_name FROM all_users au \n"
+ + "INNER JOIN all_tables at ON au.user_id=at.user_id AND
au.db_id=at.db_id";
+ }
+
+ @Override
+ protected String getTableName(ResultSet rs) throws SQLException {
+ if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+ return null;
+ }
+ return rs.getString(1) + "." + rs.getString(2);
+ }
+
+ @Override
+ protected String getSelectColumnsSql(TablePath tablePath) {
+ return String.format(
+ SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(),
tablePath.getTableName());
+ }
+
+ @Override
+ protected Column buildColumn(ResultSet resultSet) throws SQLException {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ String typeName = resultSet.getString("TYPE_NAME");
+ String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
+ long columnLength = resultSet.getLong("COLUMN_LENGTH");
+ Long columnPrecision = resultSet.getObject("COLUMN_PRECISION",
Long.class);
+ Integer columnScale = resultSet.getObject("COLUMN_SCALE",
Integer.class);
+ String columnComment = resultSet.getString("COLUMN_COMMENT");
+ Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
+ boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
+
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name(columnName)
+ .columnType(fullTypeName)
+ .dataType(typeName)
+ .length(columnLength)
+ .precision(columnPrecision)
+ .scale(columnScale)
+ .nullable(isNullable)
+ .defaultValue(defaultValue)
+ .comment(columnComment)
+ .build();
+ return XuguTypeConverter.INSTANCE.convert(typeDefine);
+ }
+
+ @Override
+ protected String getUrlFromDatabaseName(String databaseName) {
+ return defaultUrl;
+ }
+
+ @Override
+ protected String getOptionTableName(TablePath tablePath) {
+ return tablePath.getSchemaAndTableName();
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ try {
+ if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
+ return databaseExists(tablePath.getDatabaseName())
+ && listTables(tablePath.getDatabaseName())
+ .contains(tablePath.getSchemaAndTableName());
+ }
+ return listTables().contains(tablePath.getSchemaAndTableName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+
+ private List<String> listTables() {
+ List<String> databases = listDatabases();
+ return listTables(databases.get(0));
+ }
+
+ @Override
+ public CatalogTable getTable(String sqlQuery) throws SQLException {
+ Connection defaultConnection = getConnection(defaultUrl);
+ return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new
XuguTypeMapper());
+ }
+
+ @Override
+ protected String getTruncateTableSql(TablePath tablePath) {
+ return String.format(
+ "TRUNCATE TABLE \"%s\".\"%s\"",
+ tablePath.getSchemaName(), tablePath.getTableName());
+ }
+
+ @Override
+ protected String getExistDataSql(TablePath tablePath) {
+ return String.format(
+ "SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM = 1",
+ tablePath.getSchemaName(), tablePath.getTableName());
+ }
+
+ @Override
+ protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ try {
+ return getConstraintKeys(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ } catch (SQLException e) {
+ log.info("Obtain constraint failure", e);
+ return new ArrayList<>();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
new file mode 100644
index 0000000000..ac0f3e24ae
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalogFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class XuguCatalogFactory implements CatalogFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return DatabaseIdentifier.XUGU;
+ }
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+ JdbcUrlUtil.UrlInfo urlInfo = OracleURLParser.parse(urlWithDatabase);
+ Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+ if (!defaultDatabase.isPresent()) {
+ throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+ }
+ return new XuguCatalog(
+ catalogName,
+ options.get(JdbcCatalogOptions.USERNAME),
+ options.get(JdbcCatalogOptions.PASSWORD),
+ urlInfo,
+ options.get(JdbcCatalogOptions.SCHEMA));
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return JdbcCatalogOptions.BASE_RULE.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..19bce1a8ca
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class XuguCreateTableSqlBuilder {
+
+ private List<Column> columns;
+ private PrimaryKey primaryKey;
+ private String sourceCatalogName;
+ private String fieldIde;
+
+ public XuguCreateTableSqlBuilder(CatalogTable catalogTable) {
+ this.columns = catalogTable.getTableSchema().getColumns();
+ this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+ this.sourceCatalogName = catalogTable.getCatalogName();
+ this.fieldIde = catalogTable.getOptions().get("fieldIde");
+ }
+
+ public String build(TablePath tablePath) {
+ StringBuilder createTableSql = new StringBuilder();
+ createTableSql
+ .append("CREATE TABLE ")
+ .append(tablePath.getSchemaAndTableName("\""))
+ .append(" (\n");
+
+ List<String> columnSqls =
+ columns.stream()
+ .map(column ->
CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
+ .collect(Collectors.toList());
+
+ // Add primary key directly in the create table statement
+ if (primaryKey != null
+ && primaryKey.getColumnNames() != null
+ && primaryKey.getColumnNames().size() > 0) {
+ columnSqls.add(buildPrimaryKeySql(primaryKey));
+ }
+
+ createTableSql.append(String.join(",\n", columnSqls));
+ createTableSql.append("\n)");
+
+ List<String> commentSqls =
+ columns.stream()
+ .filter(column ->
StringUtils.isNotBlank(column.getComment()))
+ .map(
+ column ->
+ buildColumnCommentSql(
+ column,
tablePath.getSchemaAndTableName("\"")))
+ .collect(Collectors.toList());
+
+ if (!commentSqls.isEmpty()) {
+ createTableSql.append(";\n");
+ createTableSql.append(String.join(";\n", commentSqls));
+ }
+
+ return createTableSql.toString();
+ }
+
+ private String buildColumnSql(Column column) {
+ StringBuilder columnSql = new StringBuilder();
+ columnSql.append("\"").append(column.getName()).append("\" ");
+
+ String columnType =
+ StringUtils.equalsIgnoreCase(DatabaseIdentifier.XUGU,
sourceCatalogName)
+ ? column.getSourceType()
+ :
XuguTypeConverter.INSTANCE.reconvert(column).getColumnType();
+ columnSql.append(columnType);
+
+ if (!column.isNullable()) {
+ columnSql.append(" NOT NULL");
+ }
+
+ return columnSql.toString();
+ }
+
+ private String buildPrimaryKeySql(PrimaryKey primaryKey) {
+ String randomSuffix = UUID.randomUUID().toString().replace("-",
"").substring(0, 4);
+ String columnNamesString =
+ primaryKey.getColumnNames().stream()
+ .map(columnName -> "\"" + columnName + "\"")
+ .collect(Collectors.joining(", "));
+
+ // In xugu database, the maximum length for an identifier is 30
characters.
+ String primaryKeyStr = primaryKey.getPrimaryKey();
+ if (primaryKeyStr.length() > 25) {
+ primaryKeyStr = primaryKeyStr.substring(0, 25);
+ }
+
+ return CatalogUtils.getFieldIde(
+ "CONSTRAINT "
+ + primaryKeyStr
+ + "_"
+ + randomSuffix
+ + " PRIMARY KEY ("
+ + columnNamesString
+ + ")",
+ fieldIde);
+ }
+
+ private String buildColumnCommentSql(Column column, String tableName) {
+ StringBuilder columnCommentSql = new StringBuilder();
+ columnCommentSql
+ .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ",
fieldIde))
+ .append(tableName)
+ .append(".");
+ columnCommentSql
+ .append(CatalogUtils.quoteIdentifier(column.getName(),
fieldIde, "\""))
+ .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
+ .append(column.getComment())
+ .append("'");
+ return columnCommentSql.toString();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 3b1738afb2..2f6aabc502 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -38,4 +38,5 @@ public class DatabaseIdentifier {
public static final String VERTICA = "Vertica";
public static final String OCENABASE = "OceanBase";
public static final String TIDB = "TiDB";
+ public static final String XUGU = "XUGU";
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
new file mode 100644
index 0000000000..1ef617b393
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class XuguDialect implements JdbcDialect {
+
+ private static final int DEFAULT_XUGU_FETCH_SIZE = 500;
+ public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+ public XuguDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
+ public XuguDialect() {}
+
+ @Override
+ public String dialectName() {
+ return DatabaseIdentifier.XUGU;
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new XuguJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new XuguTypeMapper();
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ if (identifier.contains(".")) {
+ String[] parts = identifier.split("\\.");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < parts.length - 1; i++) {
+ sb.append("\"").append(parts[i]).append("\"").append(".");
+ }
+ return sb.append("\"")
+ .append(getFieldIde(parts[parts.length - 1], fieldIde))
+ .append("\"")
+ .toString();
+ }
+
+ return "\"" + getFieldIde(identifier, fieldIde) + "\"";
+ }
+
+ @Override
+ public String tableIdentifier(String database, String tableName) {
+ return quoteIdentifier(tableName);
+ }
+
+ @Override
+ public String extractTableName(TablePath tablePath) {
+ return tablePath.getSchemaAndTableName();
+ }
+
+ @Override
+ public TablePath parse(String tablePath) {
+ return TablePath.of(tablePath, true);
+ }
+
+ @Override
+ public String tableIdentifier(TablePath tablePath) {
+ return tablePath.getSchemaAndTableName();
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
+ List<String> nonUniqueKeyFields =
+ Arrays.stream(fieldNames)
+ .filter(fieldName ->
!Arrays.asList(uniqueKeyFields).contains(fieldName))
+ .collect(Collectors.toList());
+ String valuesBinding =
+ Arrays.stream(fieldNames)
+ .map(fieldName -> ":" + fieldName + " " +
quoteIdentifier(fieldName))
+ .collect(Collectors.joining(", "));
+
+ String usingClause = String.format("SELECT %s FROM DUAL",
valuesBinding);
+ String onConditions =
+ Arrays.stream(uniqueKeyFields)
+ .map(
+ fieldName ->
+ String.format(
+ "TARGET.%s=SOURCE.%s",
+ quoteIdentifier(fieldName),
+ quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(" AND "));
+ String updateSetClause =
+ nonUniqueKeyFields.stream()
+ .map(
+ fieldName ->
+ String.format(
+ "TARGET.%s=SOURCE.%s",
+ quoteIdentifier(fieldName),
+ quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(", "));
+ String insertFields =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String insertValues =
+ Arrays.stream(fieldNames)
+ .map(fieldName -> "SOURCE." +
quoteIdentifier(fieldName))
+ .collect(Collectors.joining(", "));
+
+ String upsertSQL =
+ String.format(
+ " MERGE INTO %s TARGET"
+ + " USING (%s) SOURCE"
+ + " ON (%s) "
+ + " WHEN MATCHED THEN"
+ + " UPDATE SET %s"
+ + " WHEN NOT MATCHED THEN"
+ + " INSERT (%s) VALUES (%s)",
+ tableIdentifier(database, tableName),
+ usingClause,
+ onConditions,
+ updateSetClause,
+ insertFields,
+ insertValues);
+
+ return Optional.of(upsertSQL);
+ }
+
+ @Override
+ public PreparedStatement creatPreparedStatement(
+ Connection connection, String queryTemplate, int fetchSize) throws
SQLException {
+ PreparedStatement statement =
+ connection.prepareStatement(
+ queryTemplate, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ if (fetchSize > 0) {
+ statement.setFetchSize(fetchSize);
+ } else {
+ statement.setFetchSize(DEFAULT_XUGU_FETCH_SIZE);
+ }
+ return statement;
+ }
+
+ @Override
+ public Object queryNextChunkMax(
+ Connection connection,
+ JdbcSourceTable table,
+ String columnName,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ String quotedColumn = quoteIdentifier(columnName);
+ String sqlQuery;
+ if (StringUtils.isNotBlank(table.getQuery())) {
+ sqlQuery =
+ String.format(
+ "SELECT MAX(%s) FROM ("
+ + "SELECT %s FROM (%s) WHERE %s >= ? ORDER
BY %s ASC "
+ + ") WHERE ROWNUM <= %s",
+ quotedColumn,
+ quotedColumn,
+ table.getQuery(),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ } else {
+ sqlQuery =
+ String.format(
+ "SELECT MAX(%s) FROM ("
+ + "SELECT %s FROM %s WHERE %s >= ? ORDER
BY %s ASC "
+ + ") WHERE ROWNUM <= %s",
+ quotedColumn,
+ quotedColumn,
+ table.getTablePath().getSchemaAndTableName(),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ }
+
+ try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
+ ps.setObject(1, includedLowerBound);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format("No result returned after running
query [%s]", sqlQuery));
+ }
+ return rs.getObject(1);
+ }
+ }
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData(Connection conn, String
query)
+ throws SQLException {
+ PreparedStatement ps = conn.prepareStatement(query);
+ return ps.executeQuery().getMetaData();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
new file mode 100644
index 0000000000..0e489b728b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+/** Factory for {@link XuguDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class XuguDialectFactory implements JdbcDialectFactory {
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:xugu:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new XuguDialect();
+ }
+
+ @Override
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
+ return new XuguDialect(fieldIde);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguJdbcRowConverter.java
new file mode 100644
index 0000000000..4590761965
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguJdbcRowConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+public class XuguJdbcRowConverter extends AbstractJdbcRowConverter {
+
+ @Override
+ public String converterName() {
+ return DatabaseIdentifier.XUGU;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
new file mode 100644
index 0000000000..54a8805f3b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+// reference
+//
https://docs.xugudb.com/%E8%99%9A%E8%B0%B7%E6%95%B0%E6%8D%AE%E5%BA%93%E5%AF%B9%E5%A4%96%E5%8F%91%E5%B8%83/06%E5%8F%82%E8%80%83%E6%8C%87%E5%8D%97/SQL%E8%AF%AD%E6%B3%95%E5%8F%82%E8%80%83/%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B/%E6%A6%82%E8%BF%B0/
+@Slf4j
+@AutoService(TypeConverter.class)
+public class XuguTypeConverter implements TypeConverter<BasicTypeDefine> {
+ // ============================data types=====================
+ // -------------------------number----------------------------
+ public static final String XUGU_NUMERIC = "NUMERIC";
+ public static final String XUGU_NUMBER = "NUMBER";
+ public static final String XUGU_DECIMAL = "DECIMAL";
+ public static final String XUGU_INTEGER = "INTEGER";
+ public static final String XUGU_INT = "INT";
+ public static final String XUGU_BIGINT = "BIGINT";
+ public static final String XUGU_TINYINT = "TINYINT";
+ public static final String XUGU_SMALLINT = "SMALLINT";
+ public static final String XUGU_FLOAT = "FLOAT";
+ public static final String XUGU_DOUBLE = "DOUBLE";
+
+ // ----------------------------string-------------------------
+ public static final String XUGU_CHAR = "CHAR";
+ public static final String XUGU_NCHAR = "NCHAR";
+ public static final String XUGU_VARCHAR = "VARCHAR";
+ public static final String XUGU_VARCHAR2 = "VARCHAR2";
+ public static final String XUGU_CLOB = "CLOB";
+
+ // ------------------------------time-------------------------
+ public static final String XUGU_DATE = "DATE";
+ public static final String XUGU_TIME = "TIME";
+ public static final String XUGU_TIMESTAMP = "TIMESTAMP";
+ public static final String XUGU_DATETIME = "DATETIME";
+ public static final String XUGU_DATETIME_WITH_TIME_ZONE = "DATETIME WITH
TIME ZONE";
+ public static final String XUGU_TIME_WITH_TIME_ZONE = "TIME WITH TIME
ZONE";
+ public static final String XUGU_TIMESTAMP_WITH_TIME_ZONE = "TIMESTAMP WITH
TIME ZONE";
+
+ // ---------------------------binary---------------------------
+ public static final String XUGU_BINARY = "BINARY";
+ public static final String XUGU_BLOB = "BLOB";
+
+ // ---------------------------other---------------------------
+ public static final String XUGU_GUID = "GUID";
+ public static final String XUGU_BOOLEAN = "BOOLEAN";
+ public static final String XUGU_BOOL = "BOOL";
+ public static final String XUGU_JSON = "JSON";
+
+ public static final int MAX_PRECISION = 38;
+ public static final int DEFAULT_PRECISION = MAX_PRECISION;
+ public static final int MAX_SCALE = 38;
+ public static final int DEFAULT_SCALE = 18;
+ public static final int TIMESTAMP_DEFAULT_SCALE = 3;
+ public static final int MAX_TIMESTAMP_SCALE = 6;
+ public static final int MAX_TIME_SCALE = 3;
+ public static final long MAX_VARCHAR_LENGTH = 60000;
+ public static final long POWER_2_16 = (long) Math.pow(2, 16);
+ public static final long BYTES_2GB = (long) Math.pow(2, 31);
+ public static final long MAX_BINARY_LENGTH = POWER_2_16 - 4;
+ public static final XuguTypeConverter INSTANCE = new XuguTypeConverter();
+
+ @Override
+ public String identifier() {
+ return DatabaseIdentifier.XUGU;
+ }
+
+ @Override
+ public Column convert(BasicTypeDefine typeDefine) {
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ PhysicalColumn.builder()
+ .name(typeDefine.getName())
+ .sourceType(typeDefine.getColumnType())
+ .nullable(typeDefine.isNullable())
+ .defaultValue(typeDefine.getDefaultValue())
+ .comment(typeDefine.getComment());
+
+ String xuguDataType = typeDefine.getDataType().toUpperCase();
+ switch (xuguDataType) {
+ case XUGU_BOOLEAN:
+ case XUGU_BOOL:
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ break;
+ case XUGU_TINYINT:
+ builder.dataType(BasicType.BYTE_TYPE);
+ break;
+ case XUGU_SMALLINT:
+ builder.dataType(BasicType.SHORT_TYPE);
+ break;
+ case XUGU_INT:
+ case XUGU_INTEGER:
+ builder.dataType(BasicType.INT_TYPE);
+ break;
+ case XUGU_BIGINT:
+ builder.dataType(BasicType.LONG_TYPE);
+ break;
+ case XUGU_FLOAT:
+ builder.dataType(BasicType.FLOAT_TYPE);
+ break;
+ case XUGU_DOUBLE:
+ builder.dataType(BasicType.DOUBLE_TYPE);
+ break;
+ case XUGU_NUMBER:
+ case XUGU_DECIMAL:
+ case XUGU_NUMERIC:
+ DecimalType decimalType;
+ if (typeDefine.getPrecision() != null &&
typeDefine.getPrecision() > 0) {
+ decimalType =
+ new DecimalType(
+ typeDefine.getPrecision().intValue(),
typeDefine.getScale());
+ } else {
+ decimalType = new DecimalType(DEFAULT_PRECISION,
DEFAULT_SCALE);
+ }
+ builder.dataType(decimalType);
+ builder.columnLength(Long.valueOf(decimalType.getPrecision()));
+ builder.scale(decimalType.getScale());
+ break;
+
+ case XUGU_CHAR:
+ case XUGU_NCHAR:
+ builder.dataType(BasicType.STRING_TYPE);
+ if (typeDefine.getLength() == null || typeDefine.getLength()
<= 0) {
+
builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L));
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ break;
+ case XUGU_VARCHAR:
+ case XUGU_VARCHAR2:
+ builder.dataType(BasicType.STRING_TYPE);
+ if (typeDefine.getLength() == null || typeDefine.getLength()
<= 0) {
+
builder.columnLength(TypeDefineUtils.charTo4ByteLength(MAX_VARCHAR_LENGTH));
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ break;
+ case XUGU_CLOB:
+ builder.dataType(BasicType.STRING_TYPE);
+ builder.columnLength(BYTES_2GB - 1);
+ break;
+ case XUGU_JSON:
+ case XUGU_GUID:
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+ case XUGU_BINARY:
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ builder.columnLength(MAX_BINARY_LENGTH);
+ break;
+ case XUGU_BLOB:
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ builder.columnLength(BYTES_2GB - 1);
+ break;
+ case XUGU_DATE:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
+ break;
+ case XUGU_TIME:
+ case XUGU_TIME_WITH_TIME_ZONE:
+ builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
+ break;
+ case XUGU_DATETIME:
+ case XUGU_DATETIME_WITH_TIME_ZONE:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ break;
+ case XUGU_TIMESTAMP:
+ case XUGU_TIMESTAMP_WITH_TIME_ZONE:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ if (typeDefine.getScale() == null) {
+ builder.scale(TIMESTAMP_DEFAULT_SCALE);
+ } else {
+ builder.scale(typeDefine.getScale());
+ }
+ break;
+ default:
+ throw CommonError.convertToSeaTunnelTypeError(
+ DatabaseIdentifier.XUGU, xuguDataType,
typeDefine.getName());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public BasicTypeDefine reconvert(Column column) {
+ BasicTypeDefine.BasicTypeDefineBuilder builder =
+ BasicTypeDefine.builder()
+ .name(column.getName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .defaultValue(column.getDefaultValue());
+ switch (column.getDataType().getSqlType()) {
+ case BOOLEAN:
+ builder.columnType(XUGU_BOOLEAN);
+ builder.dataType(XUGU_BOOLEAN);
+ break;
+ case TINYINT:
+ builder.columnType(XUGU_TINYINT);
+ builder.dataType(XUGU_TINYINT);
+ break;
+ case SMALLINT:
+ builder.columnType(XUGU_SMALLINT);
+ builder.dataType(XUGU_SMALLINT);
+ break;
+ case INT:
+ builder.columnType(XUGU_INTEGER);
+ builder.dataType(XUGU_INTEGER);
+ break;
+ case BIGINT:
+ builder.columnType(XUGU_BIGINT);
+ builder.dataType(XUGU_BIGINT);
+ break;
+ case FLOAT:
+ builder.columnType(XUGU_FLOAT);
+ builder.dataType(XUGU_FLOAT);
+ break;
+ case DOUBLE:
+ builder.columnType(XUGU_DOUBLE);
+ builder.dataType(XUGU_DOUBLE);
+ break;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) column.getDataType();
+ long precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ if (precision <= 0) {
+ precision = DEFAULT_PRECISION;
+ scale = DEFAULT_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which is precision less than 0, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ precision,
+ scale);
+ } else if (precision > MAX_PRECISION) {
+ scale = (int) Math.max(0, scale - (precision -
MAX_PRECISION));
+ precision = MAX_PRECISION;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which exceeds the maximum precision of
{}, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ MAX_PRECISION,
+ precision,
+ scale);
+ }
+ if (scale < 0) {
+ scale = 0;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which is scale less than 0, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ precision,
+ scale);
+ } else if (scale > MAX_SCALE) {
+ scale = MAX_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which exceeds the maximum scale of {}, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ MAX_SCALE,
+ precision,
+ scale);
+ }
+ builder.columnType(String.format("%s(%s,%s)", XUGU_NUMERIC,
precision, scale));
+ builder.dataType(XUGU_NUMERIC);
+ builder.precision(precision);
+ builder.scale(scale);
+ break;
+ case BYTES:
+ if (column.getColumnLength() == null ||
column.getColumnLength() <= 0) {
+ builder.columnType(XUGU_BLOB);
+ builder.dataType(XUGU_BLOB);
+ } else if (column.getColumnLength() <= MAX_BINARY_LENGTH) {
+ builder.columnType(XUGU_BINARY);
+ builder.dataType(XUGU_BINARY);
+ } else {
+ builder.columnType(XUGU_BLOB);
+ builder.dataType(XUGU_BLOB);
+ }
+ break;
+ case STRING:
+ if (column.getColumnLength() == null ||
column.getColumnLength() <= 0) {
+ builder.columnType(String.format("%s(%s)", XUGU_VARCHAR,
MAX_VARCHAR_LENGTH));
+ builder.dataType(XUGU_VARCHAR);
+ } else if (column.getColumnLength() <= MAX_VARCHAR_LENGTH) {
+ builder.columnType(
+ String.format("%s(%s)", XUGU_VARCHAR,
column.getColumnLength()));
+ builder.dataType(XUGU_VARCHAR);
+ } else {
+ builder.columnType(XUGU_CLOB);
+ builder.dataType(XUGU_CLOB);
+ }
+ break;
+ case DATE:
+ builder.columnType(XUGU_DATE);
+ builder.dataType(XUGU_DATE);
+ break;
+ case TIME:
+ builder.dataType(XUGU_TIME);
+ if (column.getScale() != null && column.getScale() > 0) {
+ Integer timeScale = column.getScale();
+ if (timeScale > MAX_TIME_SCALE) {
+ timeScale = MAX_TIME_SCALE;
+ log.warn(
+ "The time column {} type time({}) is out of
range, "
+ + "which exceeds the maximum scale of
{}, "
+ + "it will be converted to time({})",
+ column.getName(),
+ column.getScale(),
+ MAX_SCALE,
+ timeScale);
+ }
+ builder.columnType(String.format("%s(%s)", XUGU_TIME,
timeScale));
+ builder.scale(timeScale);
+ } else {
+ builder.columnType(XUGU_TIME);
+ }
+ break;
+ case TIMESTAMP:
+ if (column.getScale() == null || column.getScale() <= 0) {
+ builder.columnType(XUGU_TIMESTAMP);
+ } else {
+ int timestampScale = column.getScale();
+ if (column.getScale() > MAX_TIMESTAMP_SCALE) {
+ timestampScale = MAX_TIMESTAMP_SCALE;
+ log.warn(
+ "The timestamp column {} type timestamp({}) is
out of range, "
+ + "which exceeds the maximum scale of
{}, "
+ + "it will be converted to
timestamp({})",
+ column.getName(),
+ column.getScale(),
+ MAX_TIMESTAMP_SCALE,
+ timestampScale);
+ }
+ builder.columnType(String.format("TIMESTAMP(%s)",
timestampScale));
+ builder.scale(timestampScale);
+ }
+ builder.dataType(XUGU_TIMESTAMP);
+ break;
+ default:
+ throw CommonError.convertToConnectorTypeError(
+ DatabaseIdentifier.XUGU,
+ column.getDataType().getSqlType().name(),
+ column.getName());
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeMapper.java
new file mode 100644
index 0000000000..e517f56af0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeMapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+@Slf4j
+public class XuguTypeMapper implements JdbcDialectTypeMapper {
+
+ @Override
+ public Column mappingColumn(BasicTypeDefine typeDefine) {
+ return XuguTypeConverter.INSTANCE.convert(typeDefine);
+ }
+
+ @Override
+ public Column mappingColumn(ResultSetMetaData metadata, int colIndex)
throws SQLException {
+ String columnName = metadata.getColumnLabel(colIndex);
+ String nativeType = metadata.getColumnTypeName(colIndex);
+ int isNullable = metadata.isNullable(colIndex);
+ long precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ if (Arrays.asList("CHAR", "NCHAR", "VARCHAR",
"VARCHAR2").contains(nativeType)) {
+ long octetByteLength =
TypeDefineUtils.charTo4ByteLength(precision);
+ precision = octetByteLength;
+ }
+
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name(columnName)
+ .columnType(nativeType)
+ .dataType(nativeType)
+ .nullable(isNullable ==
ResultSetMetaData.columnNullable)
+ .length(precision)
+ .precision(precision)
+ .scale(scale)
+ .build();
+ return mappingColumn(typeDefine);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverterTest.java
new file mode 100644
index 0000000000..9dfd7079df
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverterTest.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter.BYTES_2GB;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter.MAX_BINARY_LENGTH;
+
+public class XuguTypeConverterTest {
+ @Test
+ public void testConvertUnsupported() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("aaa").dataType("aaa").build();
+ try {
+ XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.fail();
+ } catch (SeaTunnelRuntimeException e) {
+ // ignore
+ } catch (Throwable e) {
+ Assertions.fail();
+ }
+ }
+
+ @Test
+ public void testReconvertUnsupported() {
+ Column column =
+ PhysicalColumn.of(
+ "test",
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
+ (Long) null,
+ true,
+ null,
+ null);
+ try {
+ XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.fail();
+ } catch (SeaTunnelRuntimeException e) {
+ // ignore
+ } catch (Throwable e) {
+ Assertions.fail();
+ }
+ }
+
+ @Test
+ public void testConvertBoolean() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("bool")
+ .dataType("boolean")
+ .nullable(true)
+ .defaultValue("1")
+ .comment("test")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ Assertions.assertEquals(typeDefine.isNullable(), column.isNullable());
+ Assertions.assertEquals(typeDefine.getDefaultValue(),
column.getDefaultValue());
+ Assertions.assertEquals(typeDefine.getComment(), column.getComment());
+ }
+
+ @Test
+ public void testConvertTinyint() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("tinyint")
+ .dataType("tinyint")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.BYTE_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertSmallint() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("smallint")
+ .dataType("smallint")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertInt() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("int").dataType("int").build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertBigint() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("bigint")
+ .dataType("bigint")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertFloat() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("float")
+ .dataType("float")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertDouble() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("double")
+ .dataType("double")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertDecimal() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("numeric(38,2)")
+ .dataType("numeric")
+ .precision(38L)
+ .scale(2)
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 2), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("numeric")
+ .dataType("numeric")
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertChar() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("char").dataType("char").build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(4, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("char(10)")
+ .dataType("char")
+ .length(10L)
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(10, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertVarchar() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("varchar")
+ .dataType("varchar")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(240000, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("varchar(10)")
+ .dataType("varchar")
+ .length(10L)
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(10, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("varchar2(20)")
+ .dataType("varchar2")
+ .length(20L)
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(20, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertOtherString() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("clob").dataType("clob").build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(BYTES_2GB - 1, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("json").dataType("json").build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(null, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertBinary() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("blob").dataType("blob").build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE,
column.getDataType());
+ Assertions.assertEquals(BYTES_2GB - 1, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertDate() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("date").dataType("date").build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertTime() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("time").dataType("time").build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("time with time zone")
+ .dataType("time with time zone")
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testConvertTimestamp() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("datetime")
+ .dataType("datetime")
+ .build();
+ Column column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("datetime with time zone")
+ .dataType("datetime with time zone")
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("timestamp")
+ .dataType("timestamp")
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(3, column.getScale());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("timestamp(6)")
+ .dataType("timestamp")
+ .scale(6)
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("timestamp with time zone")
+ .dataType("timestamp with time zone")
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(3, column.getScale());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("timestamp(3) with time zone")
+ .dataType("timestamp with time zone")
+ .scale(3)
+ .build();
+ column = XuguTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
+ @Test
+ public void testReconvertBoolean() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.BOOLEAN_TYPE)
+ .nullable(true)
+ .defaultValue(true)
+ .comment("test")
+ .build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BOOLEAN,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BOOLEAN,
typeDefine.getDataType());
+ Assertions.assertEquals(column.isNullable(), typeDefine.isNullable());
+ Assertions.assertEquals(column.getDefaultValue(),
typeDefine.getDefaultValue());
+ Assertions.assertEquals(column.getComment(), typeDefine.getComment());
+ }
+
+ @Test
+ public void testReconvertByte() {
+ Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.BYTE_TYPE).build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_TINYINT,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_TINYINT,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertShort() {
+ Column column =
+
PhysicalColumn.builder().name("test").dataType(BasicType.SHORT_TYPE).build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_SMALLINT,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_SMALLINT,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertInt() {
+ Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.INT_TYPE).build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_INTEGER,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_INTEGER,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertLong() {
+ Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.LONG_TYPE).build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BIGINT,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BIGINT,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertFloat() {
+ Column column =
+
PhysicalColumn.builder().name("test").dataType(BasicType.FLOAT_TYPE).build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_FLOAT,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_FLOAT,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertDouble() {
+ Column column =
+
PhysicalColumn.builder().name("test").dataType(BasicType.DOUBLE_TYPE).build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_DOUBLE,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_DOUBLE,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertDecimal() {
+ Column column =
+ PhysicalColumn.builder().name("test").dataType(new
DecimalType(0, 0)).build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format(
+ "%s(%s,%s)",
+ XuguTypeConverter.XUGU_NUMERIC,
+ XuguTypeConverter.DEFAULT_PRECISION,
+ XuguTypeConverter.DEFAULT_SCALE),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_NUMERIC,
typeDefine.getDataType());
+
+ column = PhysicalColumn.builder().name("test").dataType(new
DecimalType(10, 2)).build();
+
+ typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s,%s)", XuguTypeConverter.XUGU_NUMERIC, 10,
2),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_NUMERIC,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertBytes() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(PrimitiveByteArrayType.INSTANCE)
+ .columnLength(null)
+ .build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BLOB,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BLOB,
typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(PrimitiveByteArrayType.INSTANCE)
+ .columnLength(MAX_BINARY_LENGTH)
+ .build();
+
+ typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BINARY,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_BINARY,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertString() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(null)
+ .build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals("VARCHAR(60000)", typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_VARCHAR,
typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(1L)
+ .build();
+
+ typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", XuguTypeConverter.XUGU_VARCHAR,
column.getColumnLength()),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_VARCHAR,
typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(60000L)
+ .build();
+
+ typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", XuguTypeConverter.XUGU_VARCHAR,
column.getColumnLength()),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_VARCHAR,
typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(BasicType.STRING_TYPE)
+ .columnLength(60001L)
+ .build();
+
+ typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_CLOB,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_CLOB,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertDate() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_DATE,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_DATE,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertTime() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_TIME_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_TIME,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_TIME,
typeDefine.getDataType());
+ }
+
+ @Test
+ public void testReconvertDatetime() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+ .build();
+
+ BasicTypeDefine typeDefine =
XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_TIMESTAMP,
typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_TIMESTAMP,
typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+ .scale(3)
+ .build();
+
+ typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", XuguTypeConverter.XUGU_TIMESTAMP,
column.getScale()),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(XuguTypeConverter.XUGU_TIMESTAMP,
typeDefine.getDataType());
+ Assertions.assertEquals(column.getScale(), typeDefine.getScale());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
+ .scale(6)
+ .build();
+
+ typeDefine = XuguTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", XuguTypeConverter.XUGU_TIMESTAMP, 6),
+ typeDefine.getColumnType());
+ }
+}
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 5c1171a82d..1da0d3014b 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -203,6 +203,7 @@
<include>com.teradata.jdbc:terajdbc4:jar</include>
<include>com.amazon.redshift:redshift-jdbc42:jar</include>
<include>net.snowflake.snowflake-jdbc:jar</include>
+ <include>com.xugudb:xugu-jdbc:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
index 09d511594f..af4c61d5b6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
@@ -91,6 +91,11 @@
<artifactId>vertica-jdbc</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.xugudb</groupId>
+ <artifactId>xugu-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
new file mode 100644
index 0000000000..5fdae0ad93
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu.XuguCatalog;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class JdbcXuguIT extends AbstractJdbcIT {
+
+ private static final String XUGU_IMAGE = "xugudb/xugudb:v12";
+ private static final String XUGU_CONTAINER_HOST = "e2e_xugudb";
+ private static final String XUGU_SCHEMA = "SYSDBA";
+ private static final String XUGU_DATABASE = "SYSTEM";
+ private static final String XUGU_SOURCE = "e2e_table_source";
+ private static final String XUGU_SINK = "e2e_table_sink";
+ private static final String CATALOG_DATABASE = "catalog_database";
+ private static final String CATALOG_TABLE = "e2e_table_catalog";
+ private static final String XUGU_USERNAME = "SYSDBA";
+ private static final String XUGU_PASSWORD = "SYSDBA";
+ private static final int XUGU_PORT = 5138;
+ private static final String XUGU_URL = "jdbc:xugu://" + HOST + ":%s/%s";
+
+ private static final String DRIVER_CLASS = "com.xugu.cloudjdbc.Driver";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList(
+ "/jdbc_xugu_source_and_upsert_sink.conf",
"/jdbc_xugu_source_and_sink.conf");
+ private static final String CREATE_SQL =
+ "create table if not exists %s"
+ + "(\n"
+ + " XUGU_NUMERIC NUMERIC(10,2),\n"
+ + " XUGU_NUMBER NUMBER(10,2),\n"
+ + " XUGU_INTEGER INTEGER,\n"
+ + " XUGU_INT INT,\n"
+ + " XUGU_BIGINT BIGINT,\n"
+ + " XUGU_TINYINT TINYINT,\n"
+ + " XUGU_SMALLINT SMALLINT,\n"
+ + " XUGU_FLOAT FLOAT,\n"
+ + " XUGU_DOUBLE DOUBLE,\n"
+ + " XUGU_CHAR CHAR,\n"
+ + " XUGU_NCHAR NCHAR,\n"
+ + " XUGU_VARCHAR VARCHAR,\n"
+ + " XUGU_VARCHAR2 VARCHAR2,\n"
+ + " XUGU_CLOB CLOB,\n"
+ + " XUGU_DATE DATE,\n"
+ + " XUGU_TIME TIME,\n"
+ + " XUGU_TIMESTAMP TIMESTAMP,\n"
+ + " XUGU_DATETIME DATETIME,\n"
+ + " XUGU_TIME_WITH_TIME_ZONE TIME WITH TIME
ZONE,\n"
+ + " XUGU_TIMESTAMP_WITH_TIME_ZONE TIMESTAMP WITH TIME
ZONE,\n"
+ + " XUGU_BINARY BINARY,\n"
+ + " XUGU_BLOB BLOB,\n"
+ + " XUGU_GUID GUID,\n"
+ + " XUGU_BOOLEAN BOOLEAN,\n"
+ + " CONSTRAINT \"XUGU_PK\" PRIMARY KEY(XUGU_INT)"
+ + ")";
+ private static final String[] fieldNames =
+ new String[] {
+ "XUGU_NUMERIC",
+ "XUGU_NUMBER",
+ "XUGU_INTEGER",
+ "XUGU_INT",
+ "XUGU_BIGINT",
+ "XUGU_TINYINT",
+ "XUGU_SMALLINT",
+ "XUGU_FLOAT",
+ "XUGU_DOUBLE",
+ "XUGU_CHAR",
+ "XUGU_NCHAR",
+ "XUGU_VARCHAR",
+ "XUGU_VARCHAR2",
+ "XUGU_CLOB",
+ "XUGU_DATE",
+ "XUGU_TIME",
+ "XUGU_TIMESTAMP",
+ "XUGU_DATETIME",
+ "XUGU_TIME_WITH_TIME_ZONE",
+ "XUGU_TIMESTAMP_WITH_TIME_ZONE",
+ "XUGU_BINARY",
+ "XUGU_BLOB",
+ "XUGU_GUID",
+ "XUGU_BOOLEAN"
+ };
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(XUGU_URL, XUGU_PORT, XUGU_DATABASE);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(XUGU_SCHEMA, XUGU_SOURCE, fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(XUGU_IMAGE)
+ .networkAliases(XUGU_CONTAINER_HOST)
+ .containerEnv(containerEnv)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(XUGU_PORT)
+ .localPort(XUGU_PORT)
+ .jdbcTemplate(XUGU_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(XUGU_USERNAME)
+ .password(XUGU_PASSWORD)
+ .schema(XUGU_SCHEMA)
+ .database(XUGU_DATABASE)
+ .sourceTable(XUGU_SOURCE)
+ .sinkTable(XUGU_SINK)
+ .catalogDatabase(CATALOG_DATABASE)
+ .catalogSchema(XUGU_SCHEMA)
+ .catalogTable(CATALOG_TABLE)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
+ }
+
+ @Override
+ void compareResult(String executeKey) {
+ defaultCompare(executeKey, fieldNames, "XUGU_INT");
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar";
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ BigDecimal.valueOf(1.12),
+ BigDecimal.valueOf(i, 2),
+ i,
+ i,
+ Long.parseLong("1"),
+ i,
+ i,
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ Date.valueOf(LocalDate.now()),
+ Time.valueOf(LocalTime.now()),
+ new Timestamp(System.currentTimeMillis()),
+ Timestamp.valueOf(LocalDateTime.now()),
+ Time.valueOf(LocalTime.now()),
+ new Timestamp(System.currentTimeMillis()),
+ null,
+ null,
+ null,
+ false
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ protected GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(XUGU_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(XUGU_CONTAINER_HOST)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(XUGU_IMAGE)));
+ container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
XUGU_PORT, XUGU_PORT)));
+
+ return container;
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return "\"" + field + "\"";
+ }
+
+ @Override
+ protected void clearTable(String database, String schema, String table) {
+ clearTable(schema, table);
+ }
+
+ @Override
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(schema, table);
+ }
+
+ @Override
+ protected void initCatalog() {
+ String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST,
dbServer.getHost());
+ catalog =
+ new XuguCatalog(
+ "xugu",
+ jdbcCase.getUserName(),
+ jdbcCase.getPassword(),
+ JdbcUrlUtil.getUrlInfo(jdbcUrl),
+ XUGU_SCHEMA);
+ catalog.open();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_sink.conf
new file mode 100644
index 0000000000..09154809f3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_sink.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM?batch_mode=false"
+ driver = "com.xugu.cloudjdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "SYSDBA"
+ password = "SYSDBA"
+ query = "select * from e2e_table_source;"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM?batch_mode=false"
+ driver = "com.xugu.cloudjdbc.Driver"
+ user = "SYSDBA"
+ password = "SYSDBA"
+ query = """INSERT INTO SYSDBA.e2e_table_sink
+ (XUGU_NUMERIC, XUGU_NUMBER, XUGU_INTEGER, XUGU_INT, XUGU_BIGINT,
XUGU_TINYINT, XUGU_SMALLINT, XUGU_FLOAT, XUGU_DOUBLE, XUGU_CHAR, XUGU_NCHAR,
XUGU_VARCHAR, XUGU_VARCHAR2, XUGU_CLOB, XUGU_DATE, XUGU_TIME, XUGU_TIMESTAMP,
XUGU_DATETIME, XUGU_TIME_WITH_TIME_ZONE, XUGU_TIMESTAMP_WITH_TIME_ZONE,
XUGU_BINARY, XUGU_BLOB, XUGU_GUID, XUGU_BOOLEAN)
+ VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?);"""
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_upsert_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_upsert_sink.conf
new file mode 100644
index 0000000000..669118f166
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_xugu_source_and_upsert_sink.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM"
+ driver = "com.xugu.cloudjdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "SYSDBA"
+ password = "SYSDBA"
+ query = "select * from e2e_table_source;"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:xugu://e2e_xugudb:5138/SYSTEM?batch_mode=false"
+ driver = "com.xugu.cloudjdbc.Driver"
+ user = "SYSDBA"
+ password = "SYSDBA"
+ generate_sink_sql = true
+ primary_keys = ["XUGU_INT"]
+ table = "SYSDBA.e2e_table_sink"
+ database = "SYSTEM"
+ }
+}