This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3130ae089e [Fix][Connector-V2][OceanBase] Remove OceanBase catalog's
dependency on mysql driver (#7311)
3130ae089e is described below
commit 3130ae089e65bc71fa61784fad7d282f92aeadaf
Author: xxsc0529 <[email protected]>
AuthorDate: Fri Aug 9 20:02:06 2024 +0800
[Fix][Connector-V2][OceanBase] Remove OceanBase catalog's dependency on
mysql driver (#7311)
---
.../catalog/oceanbase/OceanBaseCatalogFactory.java | 5 +
.../catalog/oceanbase/OceanBaseMySqlCatalog.java | 193 ++++++-
.../OceanBaseMysqlCreateTableSqlBuilder.java | 271 ++++++++++
.../dialect/oceanbase/OceanBaseDialectFactory.java | 3 +-
.../oceanbase/OceanBaseMySqlTypeConverter.java | 513 +++++++++++++++++++
.../oceanbase/OceanBaseMySqlTypeMapper.java | 72 +++
.../dialect/oceanbase/OceanBaseMysqlDialect.java | 290 +++++++++++
...ry.java => OceanBaseMysqlJdbcRowConverter.java} | 37 +-
.../dialect/oceanbase/OceanBaseMysqlType.java | 567 +++++++++++++++++++++
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 2 +
.../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 8 +-
11 files changed, 1927 insertions(+), 34 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
index 58dfa5b884..01d035e167 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
@@ -31,6 +31,9 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.auto.service.AutoService;
import java.util.Optional;
@@ -38,6 +41,8 @@ import java.util.Optional;
@AutoService(Factory.class)
public class OceanBaseCatalogFactory implements CatalogFactory {
+ private static final Logger log =
LoggerFactory.getLogger(OceanBaseCatalogFactory.class);
+
@Override
public String factoryIdentifier() {
return DatabaseIdentifier.OCENABASE;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index 58cdb5c413..08aa0faea0 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -17,10 +17,44 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;
-public class OceanBaseMySqlCatalog extends MySqlCatalog {
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
+@Slf4j
+public class OceanBaseMySqlCatalog extends AbstractJdbcCatalog {
+
+ private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+ "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA =
'%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";
+
+ private static final String SELECT_DATABASE_EXISTS =
+ "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE
SCHEMA_NAME = '%s'";
+
+ private static final String SELECT_TABLE_EXISTS =
+ "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables
WHERE table_schema = '%s' AND table_name = '%s'";
static {
SYS_DATABASES.clear();
@@ -32,8 +66,161 @@ public class OceanBaseMySqlCatalog extends MySqlCatalog {
SYS_DATABASES.add("SYS");
}
+ private OceanBaseMySqlTypeConverter typeConverter;
+
public OceanBaseMySqlCatalog(
String catalogName, String username, String pwd,
JdbcUrlUtil.UrlInfo urlInfo) {
- super(catalogName, username, pwd, urlInfo);
+ super(catalogName, username, pwd, urlInfo, null);
+ this.typeConverter = new OceanBaseMySqlTypeConverter();
+ }
+
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(SELECT_DATABASE_EXISTS, databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ SELECT_TABLE_EXISTS, tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ @Override
+ protected String getListDatabaseSql() {
+ return "SHOW DATABASES;";
+ }
+
+ @Override
+ protected String getListTableSql(String databaseName) {
+ return "SHOW TABLES;";
+ }
+
+ @Override
+ protected String getTableName(ResultSet rs) throws SQLException {
+ return rs.getString(1);
+ }
+
+ @Override
+ protected String getTableName(TablePath tablePath) {
+ return tablePath.getTableName();
+ }
+
+ @Override
+ protected String getSelectColumnsSql(TablePath tablePath) {
+ return String.format(
+ SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ @Override
+ protected TableIdentifier getTableIdentifier(TablePath tablePath) {
+ return TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ @Override
+ protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ List<ConstraintKey> indexList =
+ super.getConstraintKeys(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext();
) {
+ ConstraintKey index = it.next();
+ if
(ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType())
+ && "PRIMARY".equals(index.getConstraintName())) {
+ it.remove();
+ }
+ }
+ return indexList;
+ }
+
+ @Override
+ protected Column buildColumn(ResultSet resultSet) throws SQLException {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ // e.g. tinyint(1) unsigned
+ String columnType = resultSet.getString("COLUMN_TYPE");
+ // e.g. tinyint
+ String dataType = resultSet.getString("DATA_TYPE").toUpperCase();
+ String comment = resultSet.getString("COLUMN_COMMENT");
+ Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
+ String isNullableStr = resultSet.getString("IS_NULLABLE");
+ boolean isNullable = isNullableStr.equals("YES");
+ // e.g. `decimal(10, 2)` is 10
+ long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");
+ // e.g. `decimal(10, 2)` is 2
+ int numberScale = resultSet.getInt("NUMERIC_SCALE");
+ // e.g. `varchar(10)` is 40
+ long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
+ // e.g. `timestamp(3)` is 3
+ // int timePrecision =
+ // MySqlVersion.V_5_5.equals(version) ? 0 :
+ // resultSet.getInt("DATETIME_PRECISION");
+ int timePrecision = resultSet.getInt("DATETIME_PRECISION");
+ Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength >
0));
+ Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));
+
+ OceanBaseMysqlType oceanbaseMysqlType =
OceanBaseMysqlType.getByName(columnType);
+ boolean unsigned =
columnType.toLowerCase(Locale.ROOT).contains("unsigned");
+
+ BasicTypeDefine<OceanBaseMysqlType> typeDefine =
+ BasicTypeDefine.<OceanBaseMysqlType>builder()
+ .name(columnName)
+ .columnType(columnType)
+ .dataType(dataType)
+ .nativeType(oceanbaseMysqlType)
+ .unsigned(unsigned)
+ .length(Math.max(charOctetLength, numberPrecision))
+ .precision(numberPrecision)
+ .scale(Math.max(numberScale, timePrecision))
+ .nullable(isNullable)
+ .defaultValue(defaultValue)
+ .comment(comment)
+ .build();
+ return typeConverter.convert(typeDefine);
+ }
+
+ @Override
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ return OceanBaseMysqlCreateTableSqlBuilder.builder(tablePath, table,
typeConverter)
+ .build(table.getCatalogName());
+ }
+
+ @Override
+ protected String getDropTableSql(TablePath tablePath) {
+ return String.format(
+ "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ @Override
+ protected String getCreateDatabaseSql(String databaseName) {
+ return String.format("CREATE DATABASE `%s`;", databaseName);
+ }
+
+ @Override
+ protected String getDropDatabaseSql(String databaseName) {
+ return String.format("DROP DATABASE `%s`;", databaseName);
+ }
+
+ @Override
+ public CatalogTable getTable(String sqlQuery) throws SQLException {
+ Connection defaultConnection = getConnection(defaultUrl);
+ Statement statement = defaultConnection.createStatement();
+ ResultSetMetaData metaData =
statement.executeQuery(sqlQuery).getMetaData();
+ return CatalogUtils.getCatalogTable(
+ metaData, new OceanBaseMySqlTypeMapper(typeConverter),
sqlQuery);
+ }
+
+ @Override
+ protected String getTruncateTableSql(TablePath tablePath) throws
CatalogException {
+ return String.format(
+ "TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ public String getExistDataSql(TablePath tablePath) {
+ return String.format(
+ "SELECT * FROM `%s`.`%s` LIMIT 1;",
+ tablePath.getDatabaseName(), tablePath.getTableName());
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..bc3413dbd8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.SqlType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class OceanBaseMysqlCreateTableSqlBuilder {
+
+ private final String tableName;
+ private List<Column> columns;
+
+ private String comment;
+
+ private String engine;
+ private String charset;
+ private String collate;
+
+ private PrimaryKey primaryKey;
+
+ private List<ConstraintKey> constraintKeys;
+
+ private String fieldIde;
+
+ private final OceanBaseMySqlTypeConverter typeConverter;
+
+ private OceanBaseMysqlCreateTableSqlBuilder(
+ String tableName, OceanBaseMySqlTypeConverter typeConverter) {
+ checkNotNull(tableName, "tableName must not be null");
+ this.tableName = tableName;
+ this.typeConverter = typeConverter;
+ }
+
+ public static OceanBaseMysqlCreateTableSqlBuilder builder(
+ TablePath tablePath,
+ CatalogTable catalogTable,
+ OceanBaseMySqlTypeConverter typeConverter) {
+ checkNotNull(tablePath, "tablePath must not be null");
+ checkNotNull(catalogTable, "catalogTable must not be null");
+
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ checkNotNull(tableSchema, "tableSchema must not be null");
+
+ return new
OceanBaseMysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter)
+ .comment(catalogTable.getComment())
+ // todo: set charset and collate
+ .engine(null)
+ .charset(null)
+ .primaryKey(tableSchema.getPrimaryKey())
+ .constraintKeys(tableSchema.getConstraintKeys())
+ .addColumn(tableSchema.getColumns())
+ .fieldIde(catalogTable.getOptions().get("fieldIde"));
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder addColumn(List<Column> columns)
{
+ checkArgument(CollectionUtils.isNotEmpty(columns), "columns must not
be empty");
+ this.columns = columns;
+ return this;
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder primaryKey(PrimaryKey
primaryKey) {
+ this.primaryKey = primaryKey;
+ return this;
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder fieldIde(String fieldIde) {
+ this.fieldIde = fieldIde;
+ return this;
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder
constraintKeys(List<ConstraintKey> constraintKeys) {
+ this.constraintKeys = constraintKeys;
+ return this;
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder engine(String engine) {
+ this.engine = engine;
+ return this;
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder charset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder collate(String collate) {
+ this.collate = collate;
+ return this;
+ }
+
+ public OceanBaseMysqlCreateTableSqlBuilder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public String build(String catalogName) {
+ List<String> sqls = new ArrayList<>();
+ sqls.add(
+ String.format(
+ "CREATE TABLE %s (\n%s\n)",
+ CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"),
+ buildColumnsIdentifySql(catalogName)));
+ if (engine != null) {
+ sqls.add("ENGINE = " + engine);
+ }
+ if (charset != null) {
+ sqls.add("DEFAULT CHARSET = " + charset);
+ }
+ if (collate != null) {
+ sqls.add("COLLATE = " + collate);
+ }
+ if (comment != null) {
+ sqls.add("COMMENT = '" + comment + "'");
+ }
+ return String.join(" ", sqls) + ";";
+ }
+
+ private String buildColumnsIdentifySql(String catalogName) {
+ List<String> columnSqls = new ArrayList<>();
+ Map<String, String> columnTypeMap = new HashMap<>();
+ for (Column column : columns) {
+ columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName,
columnTypeMap));
+ }
+ if (primaryKey != null) {
+ columnSqls.add("\t" + buildPrimaryKeySql());
+ }
+ if (CollectionUtils.isNotEmpty(constraintKeys)) {
+ for (ConstraintKey constraintKey : constraintKeys) {
+ if (StringUtils.isBlank(constraintKey.getConstraintName())) {
+ continue;
+ }
+ String constraintKeyStr = buildConstraintKeySql(constraintKey,
columnTypeMap);
+ if (StringUtils.isNotBlank(constraintKeyStr)) {
+ columnSqls.add("\t" + constraintKeyStr);
+ }
+ }
+ }
+ return String.join(", \n", columnSqls);
+ }
+
+ private String buildColumnIdentifySql(
+ Column column, String catalogName, Map<String, String>
columnTypeMap) {
+ final List<String> columnSqls = new ArrayList<>();
+ columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(),
fieldIde, "`"));
+ String type;
+ if ((SqlType.TIME.equals(column.getDataType().getSqlType())
+ ||
SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
+ && column.getScale() != null) {
+ BasicTypeDefine<OceanBaseMysqlType> typeDefine =
typeConverter.reconvert(column);
+ type = typeDefine.getColumnType();
+ } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)
+ && StringUtils.isNotBlank(column.getSourceType())) {
+ type = column.getSourceType();
+ } else {
+ BasicTypeDefine<OceanBaseMysqlType> typeDefine =
typeConverter.reconvert(column);
+ type = typeDefine.getColumnType();
+ }
+ columnSqls.add(type);
+ columnTypeMap.put(column.getName(), type);
+ // nullable
+ if (column.isNullable()) {
+ columnSqls.add("NULL");
+ } else {
+ columnSqls.add("NOT NULL");
+ }
+
+ if (column.getComment() != null) {
+ columnSqls.add(
+ "COMMENT '"
+ + column.getComment().replace("'",
"''").replace("\\", "\\\\")
+ + "'");
+ }
+
+ return String.join(" ", columnSqls);
+ }
+
+ private String buildPrimaryKeySql() {
+ String key =
+ primaryKey.getColumnNames().stream()
+ .map(columnName -> "`" + columnName + "`")
+ .collect(Collectors.joining(", "));
+ // add sort type
+ return String.format("PRIMARY KEY (%s)",
CatalogUtils.quoteIdentifier(key, fieldIde));
+ }
+
+ private String buildConstraintKeySql(
+ ConstraintKey constraintKey, Map<String, String> columnTypeMap) {
+ ConstraintKey.ConstraintType constraintType =
constraintKey.getConstraintType();
+ String indexColumns =
+ constraintKey.getColumnNames().stream()
+ .map(
+ constraintKeyColumn -> {
+ String columnName =
constraintKeyColumn.getColumnName();
+ boolean withLength = false;
+ if (columnTypeMap.containsKey(columnName))
{
+ String columnType =
columnTypeMap.get(columnName);
+ if (columnType.endsWith("BLOB")
+ ||
columnType.endsWith("TEXT")) {
+ withLength = true;
+ }
+ }
+ if (constraintKeyColumn.getSortType() ==
null) {
+ return String.format(
+ "`%s`%s",
+
CatalogUtils.getFieldIde(columnName, fieldIde),
+ withLength ? "(255)" : "");
+ }
+ return String.format(
+ "`%s`%s %s",
+
CatalogUtils.getFieldIde(columnName, fieldIde),
+ withLength ? "(255)" : "",
+
constraintKeyColumn.getSortType().name());
+ })
+ .collect(Collectors.joining(", "));
+ String keyName = null;
+ switch (constraintType) {
+ case INDEX_KEY:
+ keyName = "KEY";
+ break;
+ case UNIQUE_KEY:
+ keyName = "UNIQUE KEY";
+ break;
+ case FOREIGN_KEY:
+ keyName = "FOREIGN KEY";
+ // todo:
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported constraint type: " + constraintType);
+ }
+ return String.format(
+ "%s `%s` (%s)", keyName, constraintKey.getConstraintName(),
indexColumns);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
index b3a456870c..d25d48b4f2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbas
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
import com.google.auto.service.AutoService;
@@ -44,6 +43,6 @@ public class OceanBaseDialectFactory implements
JdbcDialectFactory {
if ("oracle".equalsIgnoreCase(compatibleMode)) {
return new OracleDialect();
}
- return new MysqlDialect();
+ return new OceanBaseMysqlDialect();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
new file mode 100644
index 0000000000..4e9fa04d0d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class OceanBaseMySqlTypeConverter
+ implements TypeConverter<BasicTypeDefine<OceanBaseMysqlType>> {
+
+ // ============================data types=====================
+ static final String MYSQL_NULL = "NULL";
+ static final String MYSQL_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ static final String MYSQL_TINYINT = "TINYINT";
+ static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ static final String MYSQL_SMALLINT = "SMALLINT";
+ static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+ static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ static final String MYSQL_INT = "INT";
+ static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+ static final String MYSQL_INTEGER = "INTEGER";
+ static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ static final String MYSQL_BIGINT = "BIGINT";
+ static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ static final String MYSQL_DECIMAL = "DECIMAL";
+ static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ static final String MYSQL_FLOAT = "FLOAT";
+ static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ static final String MYSQL_DOUBLE = "DOUBLE";
+ static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ public static final String MYSQL_CHAR = "CHAR";
+ public static final String MYSQL_VARCHAR = "VARCHAR";
+ static final String MYSQL_TINYTEXT = "TINYTEXT";
+ static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+ static final String MYSQL_TEXT = "TEXT";
+ static final String MYSQL_LONGTEXT = "LONGTEXT";
+ static final String MYSQL_JSON = "JSON";
+ static final String MYSQL_ENUM = "ENUM";
+
+ // ------------------------------time-------------------------
+ static final String MYSQL_DATE = "DATE";
+ public static final String MYSQL_DATETIME = "DATETIME";
+ public static final String MYSQL_TIME = "TIME";
+ public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+ static final String MYSQL_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ static final String MYSQL_TINYBLOB = "TINYBLOB";
+ static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+ static final String MYSQL_BLOB = "BLOB";
+ static final String MYSQL_LONGBLOB = "LONGBLOB";
+ static final String MYSQL_BINARY = "BINARY";
+ static final String MYSQL_VARBINARY = "VARBINARY";
+ static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+ public static final int DEFAULT_PRECISION = 38;
+ public static final int MAX_PRECISION = 65;
+ public static final int DEFAULT_SCALE = 18;
+ public static final int MAX_SCALE = 30;
+ public static final int MAX_TIME_SCALE = 6;
+ public static final int MAX_TIMESTAMP_SCALE = 6;
+ public static final long POWER_2_8 = (long) Math.pow(2, 8);
+ public static final long POWER_2_16 = (long) Math.pow(2, 16);
+ public static final long POWER_2_24 = (long) Math.pow(2, 24);
+ public static final long POWER_2_32 = (long) Math.pow(2, 32);
+ public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
+
+ @Override
+ public String identifier() {
+ return DatabaseIdentifier.OCENABASE;
+ }
+
+ @Override
+ public Column convert(BasicTypeDefine typeDefine) {
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ PhysicalColumn.builder()
+ .name(typeDefine.getName())
+ .sourceType(typeDefine.getColumnType())
+ .nullable(typeDefine.isNullable())
+ .defaultValue(typeDefine.getDefaultValue())
+ .comment(typeDefine.getComment());
+
+ String mysqlDataType = typeDefine.getDataType().toUpperCase();
+ if (typeDefine.isUnsigned() && !(mysqlDataType.endsWith(" UNSIGNED")))
{
+ mysqlDataType = mysqlDataType + " UNSIGNED";
+ }
+ switch (mysqlDataType) {
+ case MYSQL_NULL:
+ builder.dataType(BasicType.VOID_TYPE);
+ break;
+ case MYSQL_BIT:
+ if (typeDefine.getLength() == null || typeDefine.getLength()
<= 0) {
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ } else if (typeDefine.getLength() == 1) {
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ } else {
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ // BIT(M) -> BYTE(M/8)
+ long byteLength = typeDefine.getLength() / 8;
+ byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;
+ builder.columnLength(byteLength);
+ }
+ break;
+ case MYSQL_TINYINT:
+ if (typeDefine.getColumnType().equalsIgnoreCase("tinyint(1)"))
{
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ } else {
+ builder.dataType(BasicType.BYTE_TYPE);
+ }
+ break;
+ case MYSQL_TINYINT_UNSIGNED:
+ case MYSQL_SMALLINT:
+ builder.dataType(BasicType.SHORT_TYPE);
+ break;
+ case MYSQL_SMALLINT_UNSIGNED:
+ case MYSQL_MEDIUMINT:
+ case MYSQL_MEDIUMINT_UNSIGNED:
+ case MYSQL_INT:
+ case MYSQL_INTEGER:
+ case MYSQL_YEAR:
+ builder.dataType(BasicType.INT_TYPE);
+ break;
+ case MYSQL_INT_UNSIGNED:
+ case MYSQL_INTEGER_UNSIGNED:
+ case MYSQL_BIGINT:
+ builder.dataType(BasicType.LONG_TYPE);
+ break;
+ case MYSQL_BIGINT_UNSIGNED:
+ DecimalType intDecimalType = new DecimalType(20, 0);
+ builder.dataType(intDecimalType);
+
builder.columnLength(Long.valueOf(intDecimalType.getPrecision()));
+ builder.scale(intDecimalType.getScale());
+ break;
+ case MYSQL_FLOAT:
+ builder.dataType(BasicType.FLOAT_TYPE);
+ break;
+ case MYSQL_FLOAT_UNSIGNED:
+ log.warn("{} will probably cause value overflow.",
MYSQL_FLOAT_UNSIGNED);
+ builder.dataType(BasicType.FLOAT_TYPE);
+ break;
+ case MYSQL_DOUBLE:
+ builder.dataType(BasicType.DOUBLE_TYPE);
+ break;
+ case MYSQL_DOUBLE_UNSIGNED:
+ log.warn("{} will probably cause value overflow.",
MYSQL_DOUBLE_UNSIGNED);
+ builder.dataType(BasicType.DOUBLE_TYPE);
+ break;
+ case MYSQL_DECIMAL:
+ Preconditions.checkArgument(typeDefine.getPrecision() > 0);
+
+ DecimalType decimalType;
+ if (typeDefine.getPrecision() > DEFAULT_PRECISION) {
+ log.warn("{} will probably cause value overflow.",
MYSQL_DECIMAL);
+ decimalType = new DecimalType(DEFAULT_PRECISION,
DEFAULT_SCALE);
+ } else {
+ decimalType =
+ new DecimalType(
+ typeDefine.getPrecision().intValue(),
+ typeDefine.getScale() == null
+ ? 0
+ :
typeDefine.getScale().intValue());
+ }
+ builder.dataType(decimalType);
+ builder.columnLength(Long.valueOf(decimalType.getPrecision()));
+ builder.scale(decimalType.getScale());
+ break;
+ case MYSQL_DECIMAL_UNSIGNED:
+ Preconditions.checkArgument(typeDefine.getPrecision() > 0);
+
+ log.warn("{} will probably cause value overflow.",
MYSQL_DECIMAL_UNSIGNED);
+ DecimalType decimalUnsignedType =
+ new DecimalType(
+ typeDefine.getPrecision().intValue() + 1,
+ typeDefine.getScale() == null
+ ? 0
+ : typeDefine.getScale().intValue());
+ builder.dataType(decimalUnsignedType);
+
builder.columnLength(Long.valueOf(decimalUnsignedType.getPrecision()));
+ builder.scale(decimalUnsignedType.getScale());
+ break;
+ case MYSQL_ENUM:
+ builder.dataType(BasicType.STRING_TYPE);
+ if (typeDefine.getLength() == null || typeDefine.getLength()
<= 0) {
+ builder.columnLength(100L);
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ break;
+ case MYSQL_CHAR:
+ case MYSQL_VARCHAR:
+ if (typeDefine.getLength() == null || typeDefine.getLength()
<= 0) {
+
builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L));
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+ case MYSQL_TINYTEXT:
+ builder.dataType(BasicType.STRING_TYPE);
+ builder.columnLength(POWER_2_8 - 1);
+ break;
+ case MYSQL_TEXT:
+ builder.dataType(BasicType.STRING_TYPE);
+ builder.columnLength(POWER_2_16 - 1);
+ break;
+ case MYSQL_MEDIUMTEXT:
+ builder.dataType(BasicType.STRING_TYPE);
+ builder.columnLength(POWER_2_24 - 1);
+ break;
+ case MYSQL_LONGTEXT:
+ builder.dataType(BasicType.STRING_TYPE);
+ builder.columnLength(POWER_2_32 - 1);
+ break;
+ case MYSQL_JSON:
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+ case MYSQL_BINARY:
+ case MYSQL_VARBINARY:
+ if (typeDefine.getLength() == null || typeDefine.getLength()
<= 0) {
+ builder.columnLength(1L);
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ break;
+ case MYSQL_TINYBLOB:
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ builder.columnLength(POWER_2_8 - 1);
+ break;
+ case MYSQL_BLOB:
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ builder.columnLength(POWER_2_16 - 1);
+ break;
+ case MYSQL_MEDIUMBLOB:
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ builder.columnLength(POWER_2_24 - 1);
+ break;
+ case MYSQL_LONGBLOB:
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ builder.columnLength(POWER_2_32 - 1);
+ break;
+ case MYSQL_GEOMETRY:
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ break;
+ case MYSQL_DATE:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
+ break;
+ case MYSQL_TIME:
+ builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
+ builder.scale(typeDefine.getScale());
+ break;
+ case MYSQL_DATETIME:
+ case MYSQL_TIMESTAMP:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ builder.scale(typeDefine.getScale());
+ break;
+ default:
+ throw CommonError.convertToSeaTunnelTypeError(
+ DatabaseIdentifier.OCENABASE, mysqlDataType,
typeDefine.getName());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public BasicTypeDefine<OceanBaseMysqlType> reconvert(Column column) {
+ BasicTypeDefine.BasicTypeDefineBuilder builder =
+ BasicTypeDefine.<OceanBaseMysqlType>builder()
+ .name(column.getName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .defaultValue(column.getDefaultValue());
+ switch (column.getDataType().getSqlType()) {
+ case NULL:
+ builder.nativeType(OceanBaseMysqlType.NULL);
+ builder.columnType(MYSQL_NULL);
+ builder.dataType(MYSQL_NULL);
+ break;
+ case BOOLEAN:
+ builder.nativeType(OceanBaseMysqlType.BOOLEAN);
+ builder.columnType(String.format("%s(%s)", MYSQL_TINYINT, 1));
+ builder.dataType(MYSQL_TINYINT);
+ builder.length(1L);
+ break;
+ case TINYINT:
+ builder.nativeType(OceanBaseMysqlType.TINYINT);
+ builder.columnType(MYSQL_TINYINT);
+ builder.dataType(MYSQL_TINYINT);
+ break;
+ case SMALLINT:
+ builder.nativeType(OceanBaseMysqlType.SMALLINT);
+ builder.columnType(MYSQL_SMALLINT);
+ builder.dataType(MYSQL_SMALLINT);
+ break;
+ case INT:
+ builder.nativeType(OceanBaseMysqlType.INT);
+ builder.columnType(MYSQL_INT);
+ builder.dataType(MYSQL_INT);
+ break;
+ case BIGINT:
+ builder.nativeType(OceanBaseMysqlType.BIGINT);
+ builder.columnType(MYSQL_BIGINT);
+ builder.dataType(MYSQL_BIGINT);
+ break;
+ case FLOAT:
+ builder.nativeType(OceanBaseMysqlType.FLOAT);
+ builder.columnType(MYSQL_FLOAT);
+ builder.dataType(MYSQL_FLOAT);
+ break;
+ case DOUBLE:
+ builder.nativeType(OceanBaseMysqlType.DOUBLE);
+ builder.columnType(MYSQL_DOUBLE);
+ builder.dataType(MYSQL_DOUBLE);
+ break;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) column.getDataType();
+ long precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ if (precision <= 0) {
+ precision = DEFAULT_PRECISION;
+ scale = DEFAULT_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which is precision less than 0, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ precision,
+ scale);
+ } else if (precision > MAX_PRECISION) {
+ scale = (int) Math.max(0, scale - (precision -
MAX_PRECISION));
+ precision = MAX_PRECISION;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which exceeds the maximum precision of
{}, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ MAX_PRECISION,
+ precision,
+ scale);
+ }
+ if (scale < 0) {
+ scale = 0;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which is scale less than 0, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ precision,
+ scale);
+ } else if (scale > MAX_SCALE) {
+ scale = MAX_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is out
of range, "
+ + "which exceeds the maximum scale of {}, "
+ + "it will be converted to decimal({},{})",
+ column.getName(),
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ MAX_SCALE,
+ precision,
+ scale);
+ }
+
+ builder.nativeType(OceanBaseMysqlType.DECIMAL);
+ builder.columnType(String.format("%s(%s,%s)", MYSQL_DECIMAL,
precision, scale));
+ builder.dataType(MYSQL_DECIMAL);
+ builder.precision(precision);
+ builder.scale(scale);
+ break;
+ case BYTES:
+ if (column.getColumnLength() == null ||
column.getColumnLength() <= 0) {
+ builder.nativeType(OceanBaseMysqlType.VARBINARY);
+ builder.columnType(
+ String.format("%s(%s)", MYSQL_VARBINARY,
MAX_VARBINARY_LENGTH / 2));
+ builder.dataType(MYSQL_VARBINARY);
+ } else if (column.getColumnLength() < MAX_VARBINARY_LENGTH) {
+ builder.nativeType(OceanBaseMysqlType.VARBINARY);
+ builder.columnType(
+ String.format("%s(%s)", MYSQL_VARBINARY,
column.getColumnLength()));
+ builder.dataType(MYSQL_VARBINARY);
+ } else if (column.getColumnLength() < POWER_2_24) {
+ builder.nativeType(OceanBaseMysqlType.MEDIUMBLOB);
+ builder.columnType(MYSQL_MEDIUMBLOB);
+ builder.dataType(MYSQL_MEDIUMBLOB);
+ } else {
+ builder.nativeType(OceanBaseMysqlType.LONGBLOB);
+ builder.columnType(MYSQL_LONGBLOB);
+ builder.dataType(MYSQL_LONGBLOB);
+ }
+ break;
+ case STRING:
+ if (column.getColumnLength() == null ||
column.getColumnLength() <= 0) {
+ builder.nativeType(OceanBaseMysqlType.LONGTEXT);
+ builder.columnType(MYSQL_LONGTEXT);
+ builder.dataType(MYSQL_LONGTEXT);
+ } else if (column.getColumnLength() < POWER_2_8) {
+ builder.nativeType(OceanBaseMysqlType.VARCHAR);
+ builder.columnType(
+ String.format("%s(%s)", MYSQL_VARCHAR,
column.getColumnLength()));
+ builder.dataType(MYSQL_VARCHAR);
+ } else if (column.getColumnLength() < POWER_2_16) {
+ builder.nativeType(OceanBaseMysqlType.TEXT);
+ builder.columnType(MYSQL_TEXT);
+ builder.dataType(MYSQL_TEXT);
+ } else if (column.getColumnLength() < POWER_2_24) {
+ builder.nativeType(OceanBaseMysqlType.MEDIUMTEXT);
+ builder.columnType(MYSQL_MEDIUMTEXT);
+ builder.dataType(MYSQL_MEDIUMTEXT);
+ } else {
+ builder.nativeType(OceanBaseMysqlType.LONGTEXT);
+ builder.columnType(MYSQL_LONGTEXT);
+ builder.dataType(MYSQL_LONGTEXT);
+ }
+ break;
+ case DATE:
+ builder.nativeType(OceanBaseMysqlType.DATE);
+ builder.columnType(MYSQL_DATE);
+ builder.dataType(MYSQL_DATE);
+ break;
+ case TIME:
+ builder.nativeType(OceanBaseMysqlType.TIME);
+ builder.dataType(MYSQL_TIME);
+ if (column.getScale() != null && column.getScale() > 0) {
+ int timeScale = column.getScale();
+ if (timeScale > MAX_TIME_SCALE) {
+ timeScale = MAX_TIME_SCALE;
+ log.warn(
+ "The time column {} type time({}) is out of
range, "
+ + "which exceeds the maximum scale of
{}, "
+ + "it will be converted to time({})",
+ column.getName(),
+ column.getScale(),
+ MAX_SCALE,
+ timeScale);
+ }
+ builder.columnType(String.format("%s(%s)", MYSQL_TIME,
timeScale));
+ builder.scale(timeScale);
+ } else {
+ builder.columnType(MYSQL_TIME);
+ }
+ break;
+ case TIMESTAMP:
+ builder.nativeType(OceanBaseMysqlType.DATETIME);
+ builder.dataType(MYSQL_DATETIME);
+ if (column.getScale() != null && column.getScale() > 0) {
+ int timestampScale = column.getScale();
+ if (timestampScale > MAX_TIMESTAMP_SCALE) {
+ timestampScale = MAX_TIMESTAMP_SCALE;
+ log.warn(
+ "The timestamp column {} type timestamp({}) is
out of range, "
+ + "which exceeds the maximum scale of
{}, "
+ + "it will be converted to
timestamp({})",
+ column.getName(),
+ column.getScale(),
+ MAX_TIMESTAMP_SCALE,
+ timestampScale);
+ }
+ builder.columnType(String.format("%s(%s)", MYSQL_DATETIME,
timestampScale));
+ builder.scale(timestampScale);
+ } else {
+ builder.columnType(MYSQL_DATETIME);
+ }
+ break;
+ default:
+ throw CommonError.convertToConnectorTypeError(
+ DatabaseIdentifier.OCENABASE,
+ column.getDataType().getSqlType().name(),
+ column.getName());
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java
new file mode 100644
index 0000000000..e4d6e8b973
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+public class OceanBaseMySqlTypeMapper implements JdbcDialectTypeMapper {
+
+ private OceanBaseMySqlTypeConverter typeConverter;
+
+ public OceanBaseMySqlTypeMapper() {
+ this.typeConverter = new OceanBaseMySqlTypeConverter();
+ }
+
+ public OceanBaseMySqlTypeMapper(OceanBaseMySqlTypeConverter typeConverter)
{
+ this.typeConverter = typeConverter;
+ }
+
+ @Override
+ public Column mappingColumn(BasicTypeDefine typeDefine) {
+ return typeConverter.convert(typeDefine);
+ }
+
+ @Override
+ public Column mappingColumn(ResultSetMetaData metadata, int colIndex)
throws SQLException {
+ String columnName = metadata.getColumnLabel(colIndex);
+ // e.g. tinyint unsigned
+ String nativeType = metadata.getColumnTypeName(colIndex);
+ int isNullable = metadata.isNullable(colIndex);
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+
+ if (Arrays.asList("CHAR", "VARCHAR", "ENUM").contains(nativeType)) {
+ long octetLength = TypeDefineUtils.charTo4ByteLength((long)
precision);
+ precision = (int) Math.max(precision, octetLength);
+ }
+
+ BasicTypeDefine typeDefine =
+ BasicTypeDefine.builder()
+ .name(columnName)
+ .columnType(nativeType)
+ .dataType(nativeType)
+ .nullable(isNullable ==
ResultSetMetaData.columnNullable)
+ .length((long) precision)
+ .precision((long) precision)
+ .scale(scale)
+ .build();
+ return mappingColumn(typeDefine);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
new file mode 100644
index 0000000000..83d3220b12
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class OceanBaseMysqlDialect implements JdbcDialect {
+
+ private static final List NOT_SUPPORTED_DEFAULT_VALUES =
+ Arrays.asList(
+ OceanBaseMysqlType.BLOB,
+ OceanBaseMysqlType.TEXT,
+ OceanBaseMysqlType.JSON,
+ OceanBaseMysqlType.GEOMETRY);
+
+ public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+ public OceanBaseMysqlDialect() {}
+
+ public OceanBaseMysqlDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
+ @Override
+ public String dialectName() {
+ return DatabaseIdentifier.OCENABASE;
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new OceanBaseMysqlJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new OceanBaseMySqlTypeMapper();
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + getFieldIde(identifier, fieldIde) + "`";
+ }
+
+ @Override
+ public String quoteDatabaseIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
+ String updateClause =
+ Arrays.stream(fieldNames)
+ .map(
+ fieldName ->
+ quoteIdentifier(fieldName)
+ + "=VALUES("
+ + quoteIdentifier(fieldName)
+ + ")")
+ .collect(Collectors.joining(", "));
+ String upsertSQL =
+ getInsertIntoStatement(database, tableName, fieldNames)
+ + " ON DUPLICATE KEY UPDATE "
+ + updateClause;
+ return Optional.of(upsertSQL);
+ }
+
+ @Override
+ public PreparedStatement creatPreparedStatement(
+ Connection connection, String queryTemplate, int fetchSize) throws
SQLException {
+ PreparedStatement statement =
+ connection.prepareStatement(
+ queryTemplate, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(Integer.MIN_VALUE);
+ return statement;
+ }
+
+ @Override
+ public String extractTableName(TablePath tablePath) {
+ return tablePath.getTableName();
+ }
+
+ @Override
+ public Map<String, String> defaultParameter() {
+ HashMap<String, String> map = new HashMap<>();
+ map.put("rewriteBatchedStatements", "true");
+ return map;
+ }
+
+ @Override
+ public TablePath parse(String tablePath) {
+ return TablePath.of(tablePath, false);
+ }
+
+ @Override
+ public Object[] sampleDataFromColumn(
+ Connection connection,
+ JdbcSourceTable table,
+ String columnName,
+ int samplingRate,
+ int fetchSize)
+ throws Exception {
+ String sampleQuery;
+ if (StringUtils.isNotBlank(table.getQuery())) {
+ sampleQuery =
+ String.format(
+ "SELECT %s FROM (%s) AS T",
+ quoteIdentifier(columnName), table.getQuery());
+ } else {
+ sampleQuery =
+ String.format(
+ "SELECT %s FROM %s",
+ quoteIdentifier(columnName),
tableIdentifier(table.getTablePath()));
+ }
+
+ try (Statement stmt =
+ connection.createStatement(
+ ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)) {
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ try (ResultSet rs = stmt.executeQuery(sampleQuery)) {
+ int count = 0;
+ List<Object> results = new ArrayList<>();
+
+ while (rs.next()) {
+ count++;
+ if (count % samplingRate == 0) {
+ results.add(rs.getObject(1));
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
+ }
+ Object[] resultsArray = results.toArray();
+ Arrays.sort(resultsArray);
+ return resultsArray;
+ }
+ }
+ }
+
+ @Override
+ public Long approximateRowCntStatement(Connection connection,
JdbcSourceTable table)
+ throws SQLException {
+
+ // 1. If no query is configured, use TABLE STATUS.
+ // 2. If a query is configured but does not contain a WHERE clause and
tablePath is
+ // configured , use TABLE STATUS.
+ // 3. If a query is configured with a WHERE clause, or a query
statement is configured but
+ // tablePath is TablePath.DEFAULT, use COUNT(*).
+
+ boolean useTableStats =
+ StringUtils.isBlank(table.getQuery())
+ || (!table.getQuery().toLowerCase().contains("where")
+ && table.getTablePath() != null
+ && !TablePath.DEFAULT
+ .getFullName()
+
.equals(table.getTablePath().getFullName()));
+
+ if (useTableStats) {
+ // The statement used to get approximate row count which is less
+ // accurate than COUNT(*), but is more efficient for large table.
+ TablePath tablePath = table.getTablePath();
+ String useDatabaseStatement =
+ String.format("USE %s;",
quoteDatabaseIdentifier(tablePath.getDatabaseName()));
+ String rowCountQuery =
+ String.format("SHOW TABLE STATUS LIKE '%s';",
tablePath.getTableName());
+
+ try (Statement stmt = connection.createStatement()) {
+ log.info("Split Chunk, approximateRowCntStatement: {}",
useDatabaseStatement);
+ stmt.execute(useDatabaseStatement);
+ log.info("Split Chunk, approximateRowCntStatement: {}",
rowCountQuery);
+ try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
+ if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
+ throw new SQLException(
+ String.format(
+ "No result returned after running
query [%s]",
+ rowCountQuery));
+ }
+ return rs.getLong(5);
+ }
+ }
+ }
+
+ return SQLUtils.countForSubquery(connection, table.getQuery());
+ }
+
+ @Override
+ public void refreshTableSchemaBySchemaChangeEvent(
+ String sourceDialectName,
+ AlterTableColumnEvent event,
+ JdbcConnectionProvider refreshTableSchemaConnectionProvider,
+ TablePath sinkTablePath) {
+ try (Connection connection =
+
refreshTableSchemaConnectionProvider.getOrEstablishConnection();
+ Statement stmt = connection.createStatement()) {
+ String alterTableSql = generateAlterTableSql(sourceDialectName,
event, sinkTablePath);
+ log.info("Apply schema change with sql: {}", alterTableSql);
+ stmt.execute(alterTableSql);
+ } catch (Exception e) {
+ throw new JdbcConnectorException(
+
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
+ }
+ }
+
+ @Override
+ public String decorateWithComment(String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
+ OceanBaseMysqlType nativeType = (OceanBaseMysqlType)
typeBasicTypeDefine.getNativeType();
+ if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
+ return basicSql;
+ }
+ return JdbcDialect.super.decorateWithComment(basicSql,
typeBasicTypeDefine);
+ }
+
+ @Override
+ public boolean needsQuotesWithDefaultValue(String sqlType) {
+ OceanBaseMysqlType mysqlType = OceanBaseMysqlType.getByName(sqlType);
+ switch (mysqlType) {
+ case CHAR:
+ case VARCHAR:
+ case TEXT:
+ case TINYTEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case ENUM:
+ case SET:
+ case BLOB:
+ case TINYBLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case DATE:
+ case DATETIME:
+ case TIMESTAMP:
+ case TIME:
+ case YEAR:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isSpecialDefaultValue(Object defaultValue) {
+ return MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
similarity index 50%
copy from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
copy to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index b3a456870c..2033518108 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -17,33 +17,26 @@
package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
-import com.google.auto.service.AutoService;
-
-import javax.annotation.Nonnull;
-
-@AutoService(JdbcDialectFactory.class)
-public class OceanBaseDialectFactory implements JdbcDialectFactory {
- @Override
- public boolean acceptsURL(String url) {
- return url.startsWith("jdbc:oceanbase:");
- }
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
- public JdbcDialect create() {
- throw new UnsupportedOperationException(
- "Can't create JdbcDialect without compatible mode for
OceanBase");
+ public String converterName() {
+ return DatabaseIdentifier.OCENABASE;
}
@Override
- public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
- if ("oracle".equalsIgnoreCase(compatibleMode)) {
- return new OracleDialect();
- }
- return new MysqlDialect();
+ protected void writeTime(PreparedStatement statement, int index, LocalTime
time)
+ throws SQLException {
+ // Write to time column using timestamp retains milliseconds
+ statement.setTimestamp(
+ index,
java.sql.Timestamp.valueOf(LocalDateTime.of(LocalDate.now(), time)));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java
new file mode 100644
index 0000000000..01f8141c39
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlType.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.SQLType;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.LocalDateTime;
+
+public enum OceanBaseMysqlType implements SQLType {
+ DECIMAL(
+ "DECIMAL",
+ Types.DECIMAL,
+ BigDecimal.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 65L,
+ "[(M[,D])] [UNSIGNED] [ZEROFILL]"),
+
+ DECIMAL_UNSIGNED(
+ "DECIMAL UNSIGNED",
+ Types.DECIMAL,
+ BigDecimal.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 65L,
+ "[(M[,D])] [UNSIGNED] [ZEROFILL]"),
+
+ TINYINT(
+ "TINYINT",
+ Types.TINYINT,
+ Integer.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 3L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ TINYINT_UNSIGNED(
+ "TINYINT UNSIGNED",
+ Types.TINYINT,
+ Integer.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 3L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ BOOLEAN("BOOLEAN", Types.BOOLEAN, Boolean.class, 0,
OceanBaseMysqlType.IS_NOT_DECIMAL, 3L, ""),
+
+ SMALLINT(
+ "SMALLINT",
+ Types.SMALLINT,
+ Integer.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 5L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ SMALLINT_UNSIGNED(
+ "SMALLINT UNSIGNED",
+ Types.SMALLINT,
+ Integer.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 5L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ INT(
+ "INT",
+ Types.INTEGER,
+ Integer.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 10L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ INT_UNSIGNED(
+ "INT UNSIGNED",
+ Types.INTEGER,
+ Long.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 10L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ FLOAT(
+ "FLOAT",
+ Types.REAL,
+ Float.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 12L,
+ "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+
+ FLOAT_UNSIGNED(
+ "FLOAT UNSIGNED",
+ Types.REAL,
+ Float.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 12L,
+ "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+
+ DOUBLE(
+ "DOUBLE",
+ Types.DOUBLE,
+ Double.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 22L,
+ "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+
+ DOUBLE_UNSIGNED(
+ "DOUBLE UNSIGNED",
+ Types.DOUBLE,
+ Double.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 22L,
+ "[(M,D)] [UNSIGNED] [ZEROFILL]"),
+ /** FIELD_TYPE_NULL = 6 */
+ NULL("NULL", Types.NULL, Object.class, 0,
OceanBaseMysqlType.IS_NOT_DECIMAL, 0L, ""),
+
+ TIMESTAMP(
+ "TIMESTAMP",
+ Types.TIMESTAMP,
+ Timestamp.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 26L,
+ "[(fsp)]"),
+
+ BIGINT(
+ "BIGINT",
+ Types.BIGINT,
+ Long.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 19L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ BIGINT_UNSIGNED(
+ "BIGINT UNSIGNED",
+ Types.BIGINT,
+ BigInteger.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 20L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ MEDIUMINT(
+ "MEDIUMINT",
+ Types.INTEGER,
+ Integer.class,
+ OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 7L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ MEDIUMINT_UNSIGNED(
+ "MEDIUMINT UNSIGNED",
+ Types.INTEGER,
+ Integer.class,
+ OceanBaseMysqlType.FIELD_FLAG_UNSIGNED |
OceanBaseMysqlType.FIELD_FLAG_ZEROFILL,
+ OceanBaseMysqlType.IS_DECIMAL,
+ 8L,
+ "[(M)] [UNSIGNED] [ZEROFILL]"),
+
+ DATE("DATE", Types.DATE, Date.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL,
10L, ""),
+
+ TIME("TIME", Types.TIME, Time.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL,
16L, "[(fsp)]"),
+
+ DATETIME(
+ "DATETIME",
+ Types.TIMESTAMP,
+ LocalDateTime.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 26L,
+ "[(fsp)]"),
+
+ YEAR("YEAR", Types.DATE, Date.class, 0, OceanBaseMysqlType.IS_NOT_DECIMAL,
4L, "[(4)]"),
+
+ VARCHAR(
+ "VARCHAR",
+ Types.VARCHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 65535L,
+ "(M) [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+ VARBINARY(
+ "VARBINARY",
+ Types.VARBINARY,
+ null,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 65535L,
+ "(M)"),
+
+ BIT("BIT", Types.BIT, Boolean.class, 0, OceanBaseMysqlType.IS_DECIMAL, 1L,
"[(M)]"),
+
+ JSON(
+ "JSON",
+ Types.LONGVARCHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 1073741824L,
+ ""),
+
+ ENUM(
+ "ENUM",
+ Types.CHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 65535L,
+ "('value1','value2',...) [CHARACTER SET charset_name] [COLLATE
collation_name]"),
+
+ SET(
+ "SET",
+ Types.CHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 64L,
+ "('value1','value2',...) [CHARACTER SET charset_name] [COLLATE
collation_name]"),
+
+ TINYBLOB("TINYBLOB", Types.VARBINARY, null, 0,
OceanBaseMysqlType.IS_NOT_DECIMAL, 255L, ""),
+
+ TINYTEXT(
+ "TINYTEXT",
+ Types.VARCHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 255L,
+ " [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+ MEDIUMBLOB(
+ "MEDIUMBLOB",
+ Types.LONGVARBINARY,
+ null,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 16777215L,
+ ""),
+
+ MEDIUMTEXT(
+ "MEDIUMTEXT",
+ Types.LONGVARCHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 16777215L,
+ " [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+ LONGBLOB(
+ "LONGBLOB",
+ Types.LONGVARBINARY,
+ null,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 4294967295L,
+ ""),
+
+ LONGTEXT(
+ "LONGTEXT",
+ Types.LONGVARCHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 4294967295L,
+ " [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+ BLOB("BLOB", Types.LONGVARBINARY, null, 0,
OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, "[(M)]"),
+
+ TEXT(
+ "TEXT",
+ Types.LONGVARCHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 65535L,
+ "[(M)] [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+ CHAR(
+ "CHAR",
+ Types.CHAR,
+ String.class,
+ 0,
+ OceanBaseMysqlType.IS_NOT_DECIMAL,
+ 255L,
+ "[(M)] [CHARACTER SET charset_name] [COLLATE collation_name]"),
+
+ BINARY("BINARY", Types.BINARY, null, 0, OceanBaseMysqlType.IS_NOT_DECIMAL,
255L, "(M)"),
+
+ GEOMETRY("GEOMETRY", Types.BINARY, null, 0,
OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, ""),
+ // is represented by BLOB
+ UNKNOWN("UNKNOWN", Types.OTHER, null, 0,
OceanBaseMysqlType.IS_NOT_DECIMAL, 65535L, "");
+
+ private final String name;
+ protected int jdbcType;
+ protected final Class<?> javaClass;
+ private final int flagsMask;
+ private final boolean isDecimal;
+ private final Long precision;
+ private final String createParams;
+
+ private OceanBaseMysqlType(
+ String oceanBaseMysqlTypeName,
+ int jdbcType,
+ Class<?> javaClass,
+ int allowedFlags,
+ boolean isDec,
+ Long precision,
+ String createParams) {
+ this.name = oceanBaseMysqlTypeName;
+ this.jdbcType = jdbcType;
+ this.javaClass = javaClass;
+ this.flagsMask = allowedFlags;
+ this.isDecimal = isDec;
+ this.precision = precision;
+ this.createParams = createParams;
+ }
+
+ public static final int FIELD_FLAG_UNSIGNED = 32;
+ public static final int FIELD_FLAG_ZEROFILL = 64;
+
+ private static final boolean IS_DECIMAL = true;
+ private static final boolean IS_NOT_DECIMAL = false;
+
+ public static OceanBaseMysqlType getByName(String fullMysqlTypeName) {
+
+ String typeName = "";
+
+ if (fullMysqlTypeName.indexOf("(") != -1) {
+ typeName = fullMysqlTypeName.substring(0,
fullMysqlTypeName.indexOf("(")).trim();
+ } else {
+ typeName = fullMysqlTypeName;
+ }
+
+ // the order of checks is important because some short names could
match parts of longer
+ // names
+ if (StringUtils.indexOfIgnoreCase(typeName, "DECIMAL") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "DEC") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "NUMERIC") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "FIXED") != -1) {
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ? DECIMAL_UNSIGNED
+ : DECIMAL;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYBLOB") != -1) {
+ // IMPORTANT: "TINYBLOB" must be checked before "TINY"
+ return TINYBLOB;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYTEXT") != -1) {
+ // IMPORTANT: "TINYTEXT" must be checked before "TINY"
+ return TINYTEXT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "TINYINT") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "TINY") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "INT1") != -1) {
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ||
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+ ? TINYINT_UNSIGNED
+ : TINYINT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMINT") != -1
+ // IMPORTANT: "INT24" must be checked before "INT2"
+ || StringUtils.indexOfIgnoreCase(typeName, "INT24") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "INT3") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "MIDDLEINT") != -1)
{
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ||
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+ ? MEDIUMINT_UNSIGNED
+ : MEDIUMINT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "SMALLINT") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "INT2") != -1) {
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ||
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+ ? SMALLINT_UNSIGNED
+ : SMALLINT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "BIGINT") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "SERIAL") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "INT8") != -1) {
+ // SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT
UNIQUE.
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ||
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+ ? BIGINT_UNSIGNED
+ : BIGINT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "POINT") != -1) {
+ // also covers "MULTIPOINT"
+ // IMPORTANT: "POINT" must be checked before "INT"
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "INT") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "INTEGER") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "INT4") != -1) {
+ // IMPORTANT: "INT" must be checked after all "*INT*" types
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ||
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+ ? INT_UNSIGNED
+ : INT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "DOUBLE") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "REAL") != -1
+ /* || StringUtils.indexOfIgnoreCase(name, "DOUBLE PRECISION")
!= -1 is caught by "DOUBLE" check */
+ // IMPORTANT: "FLOAT8" must be checked before "FLOAT"
+ || StringUtils.indexOfIgnoreCase(typeName, "FLOAT8") != -1) {
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ||
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+ ? DOUBLE_UNSIGNED
+ : DOUBLE;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "FLOAT") != -1 /*
+ * || StringUtils.indexOfIgnoreCase(name, "FLOAT4") != -1 is caught by
+ * "FLOAT" check
+ */) {
+ return StringUtils.indexOfIgnoreCase(fullMysqlTypeName,
"UNSIGNED") != -1
+ ||
StringUtils.indexOfIgnoreCase(fullMysqlTypeName, "ZEROFILL") != -1
+ ? FLOAT_UNSIGNED
+ : FLOAT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "NULL") != -1) {
+ return NULL;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "TIMESTAMP") != -1)
{
+ // IMPORTANT: "TIMESTAMP" must be checked before "TIME"
+ return TIMESTAMP;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "DATETIME") != -1) {
+ // IMPORTANT: "DATETIME" must be checked before "DATE" and "TIME"
+ return DATETIME;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "DATE") != -1) {
+ return DATE;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "TIME") != -1) {
+ return TIME;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "YEAR") != -1) {
+ return YEAR;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "LONGBLOB") != -1) {
+ // IMPORTANT: "LONGBLOB" must be checked before "LONG" and "BLOB"
+ return LONGBLOB;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "LONGTEXT") != -1) {
+ // IMPORTANT: "LONGTEXT" must be checked before "LONG" and "TEXT"
+ return LONGTEXT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMBLOB") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "LONG VARBINARY")
!= -1) {
+ // IMPORTANT: "MEDIUMBLOB" must be checked before "BLOB"
+ // IMPORTANT: "LONG VARBINARY" must be checked before "LONG" and
"VARBINARY"
+ return MEDIUMBLOB;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "MEDIUMTEXT") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "LONG VARCHAR") !=
-1
+ || StringUtils.indexOfIgnoreCase(typeName, "LONG") != -1) {
+ // IMPORTANT: "MEDIUMTEXT" must be checked before "TEXT"
+ // IMPORTANT: "LONG VARCHAR" must be checked before "VARCHAR"
+ return MEDIUMTEXT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "VARCHAR") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "NVARCHAR") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "NATIONAL VARCHAR")
!= -1
+ || StringUtils.indexOfIgnoreCase(typeName, "CHARACTER
VARYING") != -1) {
+ // IMPORTANT: "CHARACTER VARYING" must be checked before
"CHARACTER" and "CHAR"
+ return VARCHAR;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "VARBINARY") != -1)
{
+ return VARBINARY;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "BINARY") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "CHAR BYTE") != -1)
{
+ // IMPORTANT: "BINARY" must be checked after all "*BINARY" types
+ // IMPORTANT: "CHAR BYTE" must be checked before "CHAR"
+ return BINARY;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "LINESTRING") !=
-1) {
+ // also covers "MULTILINESTRING"
+ // IMPORTANT: "LINESTRING" must be checked before "STRING"
+ return GEOMETRY;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "STRING") != -1
+ // IMPORTANT: "CHAR" must be checked after all "*CHAR*" types
+ || StringUtils.indexOfIgnoreCase(typeName, "CHAR") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "NCHAR") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "NATIONAL CHAR") !=
-1
+ || StringUtils.indexOfIgnoreCase(typeName, "CHARACTER") != -1)
{
+ return CHAR;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "BOOLEAN") != -1
+ || StringUtils.indexOfIgnoreCase(typeName, "BOOL") != -1) {
+ return BOOLEAN;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "BIT") != -1) {
+ return BIT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "JSON") != -1) {
+ return JSON;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "ENUM") != -1) {
+ return ENUM;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "SET") != -1) {
+ return SET;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "BLOB") != -1) {
+ return BLOB;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "TEXT") != -1) {
+ return TEXT;
+
+ } else if (StringUtils.indexOfIgnoreCase(typeName, "GEOM")
+ != -1 // covers "GEOMETRY", "GEOMETRYCOLLECTION" and
"GEOMCOLLECTION"
+ || StringUtils.indexOfIgnoreCase(typeName, "POINT")
+ != -1 // also covers "MULTIPOINT"
+ || StringUtils.indexOfIgnoreCase(typeName, "POLYGON")
+ != -1 // also covers "MULTIPOLYGON"
+ ) {
+ return GEOMETRY;
+ }
+
+ return UNKNOWN;
+ }
+
+ @Override
+ public String getVendor() {
+ return "com.oceanbase";
+ }
+
+ @Override
+ public Integer getVendorTypeNumber() {
+ return this.jdbcType;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index a689632206..860131041a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -393,6 +393,8 @@ public class JdbcCatalogUtils {
.ifPresent(val ->
catalogConfig.put(JdbcCatalogOptions.USERNAME.key(), val));
config.getPassword()
.ifPresent(val ->
catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
+ Optional.ofNullable(config.getCompatibleMode())
+ .ifPresent(val ->
catalogConfig.put(JdbcCatalogOptions.COMPATIBLE_MODE.key(), val));
return ReadonlyConfig.fromMap(catalogConfig);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
index 3208473d61..a747058391 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
@@ -66,16 +66,10 @@ public class JdbcOceanBaseMysqlIT extends
JdbcOceanBaseITBase {
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && wget "
- + driverUrl()
- + " && wget "
- + mysqlDriverUrl());
+ + driverUrl());
Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
};
- String mysqlDriverUrl() {
- return
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
- }
-
@Override
List<String> configFile() {
return
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");