This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ab60790f0d [Improve][Connector] Add field name to `DataTypeConvertor`
to improve error message (#5782)
ab60790f0d is described below
commit ab60790f0d8600fe169d463b545137034d279301
Author: Jia Fan <[email protected]>
AuthorDate: Mon Nov 6 11:40:19 2023 +0800
[Improve][Connector] Add field name to `DataTypeConvertor` to improve error
message (#5782)
---
.../table/catalog/DataTypeConvertException.java | 27 +--
.../api/table/catalog/DataTypeConvertor.java | 11 +-
.../doris/datatype/DorisDataTypeConvertor.java | 18 +-
.../doris/datatype/DataTypeConvertorTest.java | 59 +++++
.../catalog/ElasticSearchCatalog.java | 3 +-
.../catalog/ElasticSearchDataTypeConvertor.java | 10 +-
.../elasticsearch/source/ElasticsearchSource.java | 2 +-
.../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 8 +-
.../jdbc/catalog/dm/DamengDataTypeConvertor.java | 18 +-
.../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 7 +-
.../catalog/mysql/MysqlCreateTableSqlBuilder.java | 8 +-
.../jdbc/catalog/mysql/MysqlDataTypeConvertor.java | 17 +-
.../jdbc/catalog/oracle/OracleCatalog.java | 8 +-
.../oracle/OracleCreateTableSqlBuilder.java | 4 +-
.../catalog/oracle/OracleDataTypeConvertor.java | 17 +-
.../jdbc/catalog/psql/PostgresCatalog.java | 7 +-
.../psql/PostgresCreateTableSqlBuilder.java | 4 +-
.../catalog/psql/PostgresDataTypeConvertor.java | 16 +-
.../redshift/RedshiftDataTypeConvertor.java | 16 +-
.../snowflake/SnowflakeDataTypeConvertor.java | 16 +-
.../jdbc/catalog/sqlserver/SqlServerCatalog.java | 7 +-
.../sqlserver/SqlServerCreateTableSqlBuilder.java | 3 +-
.../sqlserver/SqlServerDataTypeConvertor.java | 20 +-
.../jdbc/catalog/tidb/TiDBDataTypeConvertor.java | 17 +-
.../jdbc/catalog/DataTypeConvertorTest.java | 242 +++++++++++++++++++++
.../jdbc/catalog/MysqlDataTypeConvertorTest.java | 11 +-
.../catalog/SnowflakeDataTypeConvertorTest.java | 6 +-
.../catalog/MaxComputeDataTypeConvertor.java | 55 ++---
.../maxcompute/util/MaxcomputeTypeMapper.java | 5 +-
.../catalog/MaxComputeDataTypeConvertorTest.java | 44 +++-
.../catalog/StarRocksDataTypeConvertor.java | 17 +-
.../starrocks/catalog/DataTypeConvertorTest.java | 63 ++++++
.../{ => catalog}/StarRocksCatalogTest.java | 3 +-
.../{ => catalog}/StarRocksCreateTableTest.java | 2 +-
.../seatunnel/engine/e2e/k8s/KubernetesIT.java | 2 +-
35 files changed, 618 insertions(+), 155 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
index 51f61dd6e4..c5649f548e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
@@ -17,16 +17,12 @@
package org.apache.seatunnel.api.table.catalog;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
public class DataTypeConvertException extends SeaTunnelRuntimeException {
private static final String CONVERT_TO_SEA_TUNNEL_ERROR_MSG =
- "Convert type: %s to SeaTunnel data type error.";
-
- private static final String CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG =
- "Convert SeaTunnel data type: %s to connector data type error.";
+ "Convert type: %s of the %s field to SeaTunnel data type error.";
public DataTypeConvertException(String message) {
this(message, null);
@@ -36,26 +32,9 @@ public class DataTypeConvertException extends
SeaTunnelRuntimeException {
super(CommonErrorCode.UNSUPPORTED_DATA_TYPE, message, cause);
}
- public static DataTypeConvertException
convertToSeaTunnelDataTypeException(Object dataType) {
- return new DataTypeConvertException(
- String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, dataType));
- }
-
public static DataTypeConvertException convertToSeaTunnelDataTypeException(
- Object dataType, Throwable cause) {
- return new DataTypeConvertException(
- String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, dataType),
cause);
- }
-
- public static DataTypeConvertException convertToConnectorDataTypeException(
- SeaTunnelDataType<?> seaTunnelDataType) {
- return new DataTypeConvertException(
- String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG,
seaTunnelDataType));
- }
-
- public static DataTypeConvertException convertToConnectorDataTypeException(
- SeaTunnelDataType<?> seaTunnelDataType, Throwable cause) {
+ String field, Object dataType) {
return new DataTypeConvertException(
- String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG,
seaTunnelDataType), cause);
+ String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, dataType,
field));
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
index fb5a152dc3..31e4e818e8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -28,26 +28,29 @@ public interface DataTypeConvertor<T> {
/**
* Transfer the data type from connector to SeaTunnel.
*
+ * @param field The field name of the column
* @param connectorDataType e.g. "int", "varchar(255)"
* @return the data type of SeaTunnel
*/
- SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType);
+ SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType);
/**
* Transfer the data type from connector to SeaTunnel.
*
+ * @param field The field name of the column
* @param connectorDataType origin data type
* @param dataTypeProperties origin data type properties, e.g. precision,
scale, length
* @return SeaTunnel data type
*/
// todo: If the origin data type contains the properties, we can remove
the dataTypeProperties.
SeaTunnelDataType<?> toSeaTunnelType(
- T connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, T connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException;
/**
* Transfer the data type from SeaTunnel to connector.
*
+ * @param field The field name of the column
* @param seaTunnelDataType seaTunnel data type
* @param dataTypeProperties seaTunnel data type properties, e.g.
precision, scale, length
* @return origin data type
@@ -55,7 +58,9 @@ public interface DataTypeConvertor<T> {
// todo: If the SeaTunnel data type contains the properties, we can remove
the
// dataTypeProperties.
T toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException;
String getIdentity();
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
index 7c9f08dfb7..40366ddd31 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
@@ -73,7 +73,7 @@ public class DorisDataTypeConvertor implements
DataTypeConvertor<String> {
public static final Integer DEFAULT_SCALE = 0;
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
checkNotNull(connectorDataType, "connectorDataType can not be null");
Map<String, Object> dataTypeProperties;
switch (connectorDataType.toUpperCase(Locale.ROOT)) {
@@ -99,12 +99,12 @@ public class DorisDataTypeConvertor implements
DataTypeConvertor<String> {
dataTypeProperties = Collections.emptyMap();
break;
}
- return toSeaTunnelType(connectorDataType, dataTypeProperties);
+ return toSeaTunnelType(field, connectorDataType, dataTypeProperties);
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- String connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, String connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(connectorDataType, "mysqlType can not be null");
int precision;
@@ -145,13 +145,17 @@ public class DorisDataTypeConvertor implements
DataTypeConvertor<String> {
return new DecimalType(precision, scale);
default:
throw new UnsupportedOperationException(
- String.format("Doesn't support DORIS type '%s''
yet.", connectorDataType));
+ String.format(
+ "Doesn't support Doris type '%s' of the '%s'
field yet.",
+ connectorDataType, field));
}
}
@Override
public String toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -189,7 +193,9 @@ public class DorisDataTypeConvertor implements
DataTypeConvertor<String> {
return TIMESTAMP;
default:
throw new UnsupportedOperationException(
- String.format("Doesn't support Doris type '%s''
yet.", sqlType));
+ String.format(
+ "Doris doesn't support SeaTunnel type '%s' of
the '%s' field yet.",
+ sqlType, field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DataTypeConvertorTest.java
new file mode 100644
index 0000000000..9a0d46c11a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DataTypeConvertorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.datatype;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+public class DataTypeConvertorTest {
+
+ @Test
+ void testConvertorErrorMsgWithUnsupportedType() {
+ SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new
SeaTunnelDataType[0]);
+ MultipleRowType multipleRowType =
+ new MultipleRowType(new String[] {"table"}, new
SeaTunnelRowType[] {rowType});
+ DorisDataTypeConvertor doris = new DorisDataTypeConvertor();
+ UnsupportedOperationException exception =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> doris.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "Doesn't support Doris type 'UNSUPPORTED_TYPE' of the 'test'
field yet.",
+ exception.getMessage());
+ UnsupportedOperationException exception2 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> doris.toSeaTunnelType("test",
"UNSUPPORTED_TYPE", new HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support Doris type 'UNSUPPORTED_TYPE' of the 'test'
field yet.",
+ exception2.getMessage());
+ UnsupportedOperationException exception3 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> doris.toConnectorType("test", multipleRowType,
new HashMap<>()));
+ Assertions.assertEquals(
+ "Doris doesn't support SeaTunnel type 'MULTIPLE_ROW' of the
'test' field yet.",
+ exception3.getMessage());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 3edaaa2846..8ca60925b0 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -148,7 +148,8 @@ public class ElasticSearchCatalog implements Catalog {
PhysicalColumn physicalColumn =
PhysicalColumn.of(
fieldName,
-
elasticSearchDataTypeConvertor.toSeaTunnelType(fieldType),
+
elasticSearchDataTypeConvertor.toSeaTunnelType(
+ fieldName, fieldType),
null,
true,
null,
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index a1bd50bbda..acd3d6aba9 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -47,13 +47,13 @@ public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<String>
public static final String DATE = "date";
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
- return toSeaTunnelType(connectorDataType, null);
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
+ return toSeaTunnelType(field, connectorDataType, null);
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- String connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, String connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(connectorDataType, "connectorDataType can not be null");
switch (connectorDataType) {
@@ -88,7 +88,9 @@ public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<String>
@Override
public String toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(seaTunnelDataType, "seaTunnelDataType can not be null");
SqlType sqlType = seaTunnelDataType.getSqlType();
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index e577695e9f..c7891745f5 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -79,7 +79,7 @@ public class ElasticsearchSource
for (int i = 0; i < source.size(); i++) {
String esType = esFieldType.get(source.get(i));
SeaTunnelDataType seaTunnelDataType =
- elasticSearchDataTypeConvertor.toSeaTunnelType(esType);
+
elasticSearchDataTypeConvertor.toSeaTunnelType(source.get(i), esType);
fieldTypes[i] = seaTunnelDataType;
}
rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[0]),
fieldTypes);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index a93a7ee560..edf3e1b380 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -122,7 +122,8 @@ public class DamengCatalog extends AbstractJdbcCatalog {
Object defaultValue = resultSet.getObject("DATA_DEFAULT");
boolean isNullable = resultSet.getString("NULLABLE").equals("Y");
- SeaTunnelDataType<?> type = fromJdbcType(typeName, columnPrecision,
columnScale);
+ SeaTunnelDataType<?> type =
+ fromJdbcType(columnName, typeName, columnPrecision,
columnScale);
return PhysicalColumn.of(
columnName,
@@ -139,11 +140,12 @@ public class DamengCatalog extends AbstractJdbcCatalog {
columnLength);
}
- private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision,
long scale) {
+ private SeaTunnelDataType<?> fromJdbcType(
+ String columnName, String typeName, long precision, long scale) {
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(DamengDataTypeConvertor.PRECISION, precision);
dataTypeProperties.put(DamengDataTypeConvertor.SCALE, scale);
- return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName,
dataTypeProperties);
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, typeName,
dataTypeProperties);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
index 1a26f95dc2..5cad2122ff 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
@@ -119,12 +119,13 @@ public class DamengDataTypeConvertor implements
DataTypeConvertor<String> {
}
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String dataType) {
- return toSeaTunnelType(dataType, Collections.emptyMap());
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String dataType)
{
+ return toSeaTunnelType(field, dataType, Collections.emptyMap());
}
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String dataType, Map<String,
Object> properties)
+ public SeaTunnelDataType<?> toSeaTunnelType(
+ String field, String dataType, Map<String, Object> properties)
throws DataTypeConvertException {
switch (dataType.toUpperCase()) {
case DM_BIT:
@@ -201,12 +202,15 @@ public class DamengDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_OPERATION,
- String.format("Doesn't support Dmdb type '%s' yet.",
dataType));
+ String.format(
+ "Doesn't support DMDB type '%s' of the '%s'
field yet.",
+ dataType, field));
}
}
@Override
- public String toConnectorType(SeaTunnelDataType<?> dataType, Map<String,
Object> properties)
+ public String toConnectorType(
+ String field, SeaTunnelDataType<?> dataType, Map<String, Object>
properties)
throws DataTypeConvertException {
SqlType sqlType = dataType.getSqlType();
switch (sqlType) {
@@ -238,7 +242,9 @@ public class DamengDataTypeConvertor implements
DataTypeConvertor<String> {
return DM_BINARY;
default:
throw new UnsupportedOperationException(
- String.format("Doesn't support SeaTunnel type '%s'
yet.", dataType));
+ String.format(
+ "Doesn't support SeaTunnel type '%s' of the
'%s' field yet.",
+ dataType, field));
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 1b4a8f3425..6bb1274a80 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -129,7 +129,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
if (sourceType.toLowerCase(Locale.ROOT).contains("unsigned")) {
typeName += "_UNSIGNED";
}
- SeaTunnelDataType<?> type = fromJdbcType(typeName, precision, scale);
+ SeaTunnelDataType<?> type = fromJdbcType(columnName, typeName,
precision, scale);
String comment = resultSet.getString("COLUMN_COMMENT");
Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
String isNullableStr = resultSet.getString("IS_NULLABLE");
@@ -198,12 +198,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
return String.format("DROP DATABASE `%s`;", databaseName);
}
- private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision,
int scale) {
+ private SeaTunnelDataType<?> fromJdbcType(
+ String columnName, String typeName, int precision, int scale) {
MysqlType mysqlType = MysqlType.getByName(typeName);
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, precision);
dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, scale);
- return DATA_TYPE_CONVERTOR.toSeaTunnelType(mysqlType,
dataTypeProperties);
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, mysqlType,
dataTypeProperties);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 5f44495a0a..41189ad2ac 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -213,14 +213,18 @@ public class MysqlCreateTableSqlBuilder {
} else {
// Column type
final String name =
-
mysqlDataTypeConvertor.toConnectorType(column.getDataType(), null).getName();
+ mysqlDataTypeConvertor
+ .toConnectorType(column.getName(),
column.getDataType(), null)
+ .getName();
if (columnLength == 0
&& StringUtils.equalsIgnoreCase(name,
MysqlType.VARCHAR.getName())) {
columnSqls.add(MysqlType.LONGTEXT.getName());
return;
}
columnSqls.add(
-
mysqlDataTypeConvertor.toConnectorType(column.getDataType(), null).getName());
+ mysqlDataTypeConvertor
+ .toConnectorType(column.getName(),
column.getDataType(), null)
+ .getName());
String fieSql = "";
List<String> list = new ArrayList<>();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
index f535b3580b..abe6fda112 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
@@ -51,7 +51,7 @@ public class MysqlDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
public static final Integer DEFAULT_SCALE = 0;
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
checkNotNull(connectorDataType, "connectorDataType can not be null");
MysqlType mysqlType = MysqlType.getByName(connectorDataType);
Map<String, Object> dataTypeProperties;
@@ -81,14 +81,14 @@ public class MysqlDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
dataTypeProperties = Collections.emptyMap();
break;
}
- return toSeaTunnelType(mysqlType, dataTypeProperties);
+ return toSeaTunnelType(field, mysqlType, dataTypeProperties);
}
// todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType
doesn't contains
// properties.
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- MysqlType mysqlType, Map<String, Object> dataTypeProperties)
+ String field, MysqlType mysqlType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(mysqlType, "mysqlType can not be null");
int precision;
@@ -158,13 +158,16 @@ public class MysqlDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
return new DecimalType(precision, scale);
// TODO: support 'SET' & 'YEAR' type
default:
- throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+ throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(
+ field, mysqlType);
}
}
@Override
public MysqlType toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
SqlType sqlType = seaTunnelDataType.getSqlType();
// todo: verify
@@ -205,7 +208,9 @@ public class MysqlDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- String.format("Doesn't support MySQL type '%s' yet",
sqlType));
+ String.format(
+ "Doesn't support MySQL type '%s' of the '%s'
field yet",
+ sqlType, field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 0097a6008c..606b0a8f3f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -174,7 +174,8 @@ public class OracleCatalog extends AbstractJdbcCatalog {
Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
- SeaTunnelDataType<?> type = fromJdbcType(typeName, columnPrecision,
columnScale);
+ SeaTunnelDataType<?> type =
+ fromJdbcType(columnName, typeName, columnPrecision,
columnScale);
long bitLen = 0;
switch (typeName) {
case ORACLE_LONG:
@@ -214,11 +215,12 @@ public class OracleCatalog extends AbstractJdbcCatalog {
columnLength);
}
- private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision,
long scale) {
+ private SeaTunnelDataType<?> fromJdbcType(
+ String columnName, String typeName, long precision, long scale) {
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(OracleDataTypeConvertor.PRECISION, precision);
dataTypeProperties.put(OracleDataTypeConvertor.SCALE, scale);
- return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName,
dataTypeProperties);
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, typeName,
dataTypeProperties);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
index b10cdd93dc..99bcfedd6d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
@@ -122,7 +122,9 @@ public class OracleCreateTableSqlBuilder {
return "CLOB";
}
default:
- String type =
oracleDataTypeConvertor.toConnectorType(column.getDataType(), null);
+ String type =
+ oracleDataTypeConvertor.toConnectorType(
+ column.getName(), column.getDataType(), null);
if (type.equals("NUMBER")) {
if (column.getDataType() instanceof DecimalType) {
DecimalType decimalType = (DecimalType)
column.getDataType();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
index f290fa2688..24ca049ce9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
@@ -79,13 +79,13 @@ public class OracleDataTypeConvertor implements
DataTypeConvertor<String> {
public static final String ORACLE_LONG_RAW = "LONG RAW";
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
- return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
+ return toSeaTunnelType(field, connectorDataType,
Collections.emptyMap());
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- String connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, String connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(connectorDataType, "Oracle Type cannot be null");
connectorDataType = normalizeTimestamp(connectorDataType);
@@ -142,13 +142,17 @@ public class OracleDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_OPERATION,
- String.format("Doesn't support ORACLE type '%s' yet.",
connectorDataType));
+ String.format(
+ "Doesn't support ORACLE type '%s' of the '%s'
field yet.",
+ connectorDataType, field));
}
}
@Override
public String toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -178,7 +182,8 @@ public class OracleDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new UnsupportedOperationException(
String.format(
- "Doesn't support SeaTunnel type '%s' yet.",
seaTunnelDataType));
+ "Doesn't support SeaTunnel type '%s' of the
'%s' field yet.",
+ seaTunnelDataType.getSqlType(), field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index af89a6c6c1..0a447eb1e2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -145,7 +145,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
defaultValue = null;
}
- SeaTunnelDataType<?> type = fromJdbcType(typeName, columnLength,
columnScale);
+ SeaTunnelDataType<?> type = fromJdbcType(columnName, typeName,
columnLength, columnScale);
long bitLen = 0;
switch (typeName) {
case PG_BYTEA:
@@ -241,11 +241,12 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
super.dropDatabaseInternal(databaseName);
}
- private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision,
long scale) {
+ private SeaTunnelDataType<?> fromJdbcType(
+ String columnName, String typeName, long precision, long scale) {
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(PostgresDataTypeConvertor.PRECISION, precision);
dataTypeProperties.put(PostgresDataTypeConvertor.SCALE, scale);
- return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName,
dataTypeProperties);
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, typeName,
dataTypeProperties);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
index 43269e452c..52266f6a90 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
@@ -157,7 +157,9 @@ public class PostgresCreateTableSqlBuilder {
return "text";
}
default:
- String type =
postgresDataTypeConvertor.toConnectorType(column.getDataType(), null);
+ String type =
+ postgresDataTypeConvertor.toConnectorType(
+ column.getName(), column.getDataType(), null);
if (type.equals(PG_NUMERIC)) {
DecimalType decimalType = (DecimalType)
column.getDataType();
return "numeric("
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
index 2eaa87f862..1599bd84d6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
@@ -111,13 +111,13 @@ public class PostgresDataTypeConvertor implements
DataTypeConvertor<String> {
public static final String PG_XML = "xml";
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
- return toSeaTunnelType(connectorDataType, new HashMap<>(0));
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
+ return toSeaTunnelType(field, connectorDataType, new HashMap<>(0));
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- String connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, String connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(connectorDataType, "Postgres Type cannot be null");
switch (connectorDataType) {
@@ -189,13 +189,16 @@ public class PostgresDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new UnsupportedOperationException(
String.format(
- "Doesn't support POSTGRES type '%s'' yet.",
connectorDataType));
+ "Doesn't support POSTGRES type '%s' of the
'%s' field yet.",
+ connectorDataType, field));
}
}
@Override
public String toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -228,7 +231,8 @@ public class PostgresDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new UnsupportedOperationException(
String.format(
- "Doesn't support SeaTunnel type '%s'' yet.",
seaTunnelDataType));
+ "Doesn't support SeaTunnel type '%s' of the
'%s' field yet.",
+ seaTunnelDataType.getSqlType(), field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
index 9bcb52637a..db5984aeec 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -95,13 +95,13 @@ public class RedshiftDataTypeConvertor implements
DataTypeConvertor<String> {
private static final String REDSHIFT_TIMESTAMPTZ = "timestamptz";
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
- return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
+ return toSeaTunnelType(field, connectorDataType,
Collections.emptyMap());
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- String connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, String connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(connectorDataType, "redshiftType cannot be null");
switch (connectorDataType) {
@@ -157,13 +157,16 @@ public class RedshiftDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new UnsupportedOperationException(
String.format(
- "Doesn't support REDSHIFT type '%s'' yet.",
connectorDataType));
+ "Doesn't support REDSHIFT type '%s' of the
'%s' field yet.",
+ connectorDataType, field));
}
}
@Override
public String toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -196,7 +199,8 @@ public class RedshiftDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new UnsupportedOperationException(
String.format(
- "Doesn't support SeaTunnel type '%s'' yet.",
seaTunnelDataType));
+ "Doesn't support SeaTunnel type '%s' of the
'%s' field yet.",
+ seaTunnelDataType.getSqlType(), field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
index 17dc5337fa..a6477a8e43 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
@@ -88,13 +88,13 @@ public class SnowflakeDataTypeConvertor implements
DataTypeConvertor<String> {
private static final String SNOWFLAKE_OBJECT = "OBJECT";
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
- return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
+ return toSeaTunnelType(field, connectorDataType,
Collections.emptyMap());
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- String connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, String connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(connectorDataType, "redshiftType cannot be null");
@@ -151,13 +151,16 @@ public class SnowflakeDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new UnsupportedOperationException(
String.format(
- "Doesn't support SNOWFLAKE type '%s' yet.",
connectorDataType));
+ "Doesn't support SNOWFLAKE type '%s' of the
'%s' field yet.",
+ connectorDataType, field));
}
}
@Override
public String toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -191,7 +194,8 @@ public class SnowflakeDataTypeConvertor implements
DataTypeConvertor<String> {
default:
throw new UnsupportedOperationException(
String.format(
- "Doesn't support SeaTunnel type '%s'' yet.",
seaTunnelDataType));
+ "Doesn't support SeaTunnel type '%s' of the
'%s' field yet.",
+ seaTunnelDataType.getSqlType(), field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 5c64dda104..478ef873c7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -105,7 +105,7 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
int precision = resultSet.getInt("precision");
int scale = resultSet.getInt("scale");
long columnLength = resultSet.getLong("max_length");
- SeaTunnelDataType<?> type = fromJdbcType(sourceType, precision, scale);
+ SeaTunnelDataType<?> type = fromJdbcType(columnName, sourceType,
precision, scale);
String comment = resultSet.getString("comment");
Object defaultValue = resultSet.getObject("default_value");
if (defaultValue != null) {
@@ -178,12 +178,13 @@ public class SqlServerCatalog extends AbstractJdbcCatalog
{
columnLength);
}
- private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision,
int scale) {
+ private SeaTunnelDataType<?> fromJdbcType(
+ String columnName, String typeName, int precision, int scale) {
Pair<SqlServerType, Map<String, Object>> pair =
SqlServerType.parse(typeName);
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(SqlServerDataTypeConvertor.PRECISION,
precision);
dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, scale);
- return DATA_TYPE_CONVERTOR.toSeaTunnelType(pair.getLeft(),
dataTypeProperties);
+ return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, pair.getLeft(),
dataTypeProperties);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
index 3360f4514a..24601ba145 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
@@ -218,7 +218,8 @@ public class SqlServerCreateTableSqlBuilder {
} else {
// Add column type
SqlServerType sqlServerType =
-
sqlServerDataTypeConvertor.toConnectorType(column.getDataType(), null);
+ sqlServerDataTypeConvertor.toConnectorType(
+ column.getName(), column.getDataType(), null);
String typeName = sqlServerType.getName();
String fieldSuffixSql = null;
tyNameDef = typeName;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
index 421c3a3de5..df9d956ce1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
@@ -44,15 +44,17 @@ public class SqlServerDataTypeConvertor implements
DataTypeConvertor<SqlServerTy
public static final Integer DEFAULT_SCALE = 0;
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(@NonNull String
connectorDataType) {
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, @NonNull String
connectorDataType) {
Pair<SqlServerType, Map<String, Object>> sqlServerType =
SqlServerType.parse(connectorDataType);
- return toSeaTunnelType(sqlServerType.getLeft(),
sqlServerType.getRight());
+ return toSeaTunnelType(field, sqlServerType.getLeft(),
sqlServerType.getRight());
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- @NonNull SqlServerType connectorDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ @NonNull SqlServerType connectorDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
switch (connectorDataType) {
case BIT:
@@ -101,13 +103,17 @@ public class SqlServerDataTypeConvertor implements
DataTypeConvertor<SqlServerTy
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_OPERATION,
- String.format("Doesn't support SQLSERVER type '%s'",
connectorDataType));
+ String.format(
+ "Doesn't support SQLSERVER type '%s' of the
'%s' field",
+ connectorDataType, field));
}
}
@Override
public SqlServerType toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
SqlType sqlType = seaTunnelDataType.getSqlType();
switch (sqlType) {
@@ -140,7 +146,9 @@ public class SqlServerDataTypeConvertor implements
DataTypeConvertor<SqlServerTy
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- String.format("Doesn't support SqlServer type '%s'
yet", sqlType));
+ String.format(
+ "Doesn't support SqlServer type '%s' of the
'%s' field yet",
+ sqlType, field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
index 18b7db3187..7937d3059a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
@@ -51,7 +51,7 @@ public class TiDBDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
public static final Integer DEFAULT_SCALE = 0;
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
checkNotNull(connectorDataType, "connectorDataType can not be null");
MysqlType mysqlType = MysqlType.getByName(connectorDataType);
Map<String, Object> dataTypeProperties;
@@ -80,14 +80,14 @@ public class TiDBDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
dataTypeProperties = Collections.emptyMap();
break;
}
- return toSeaTunnelType(mysqlType, dataTypeProperties);
+ return toSeaTunnelType(field, mysqlType, dataTypeProperties);
}
// todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType
doesn't contains
// properties.
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- MysqlType mysqlType, Map<String, Object> dataTypeProperties)
+ String field, MysqlType mysqlType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(mysqlType, "mysqlType can not be null");
int precision;
@@ -157,13 +157,16 @@ public class TiDBDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
return new DecimalType(precision, scale);
// TODO: support 'SET' & 'YEAR' type
default:
- throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+ throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(
+ field, mysqlType);
}
}
@Override
public MysqlType toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
SqlType sqlType = seaTunnelDataType.getSqlType();
// todo: verify
@@ -201,7 +204,9 @@ public class TiDBDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- String.format("Doesn't support TiDB type '%s' yet",
sqlType));
+ String.format(
+ "TiDB doesn't support SeaTunnel type '%s' of
the '%s' field yet",
+ sqlType, field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/DataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/DataTypeConvertorTest.java
new file mode 100644
index 0000000000..4fa195e2df
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/DataTypeConvertorTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm.DamengDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift.RedshiftDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.snowflake.SnowflakeDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static com.mysql.cj.MysqlType.UNKNOWN;
+
+public class DataTypeConvertorTest {
+
+ @Test
+ void testConvertorErrorMsgWithUnsupportedType() {
+ SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new
SeaTunnelDataType[0]);
+ MultipleRowType multipleRowType =
+ new MultipleRowType(new String[] {"table"}, new
SeaTunnelRowType[] {rowType});
+
+ DamengDataTypeConvertor dameng = new DamengDataTypeConvertor();
+ JdbcConnectorException exception =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> dameng.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported
operation] - Doesn't support DMDB type 'UNSUPPORTED_TYPE' of the 'test' field
yet.",
+ exception.getMessage());
+ JdbcConnectorException exception2 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> dameng.toSeaTunnelType("test",
"UNSUPPORTED_TYPE", new HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported
operation] - Doesn't support DMDB type 'UNSUPPORTED_TYPE' of the 'test' field
yet.",
+ exception2.getMessage());
+ UnsupportedOperationException exception3 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> dameng.toConnectorType("test", rowType, new
HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support SeaTunnel type 'ROW<>' of the 'test' field
yet.",
+ exception3.getMessage());
+
+ MysqlDataTypeConvertor mysql = new MysqlDataTypeConvertor();
+ DataTypeConvertException exception4 =
+ Assertions.assertThrows(
+ DataTypeConvertException.class,
+ () -> mysql.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+ exception4.getMessage());
+ DataTypeConvertException exception5 =
+ Assertions.assertThrows(
+ DataTypeConvertException.class,
+ () -> mysql.toSeaTunnelType("test", UNKNOWN, new
HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+ exception5.getMessage());
+ JdbcConnectorException exception6 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> mysql.toConnectorType("test", multipleRowType,
new HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Doesn't support MySQL type 'MULTIPLE_ROW' of the 'test' field yet",
+ exception6.getMessage());
+
+ OracleDataTypeConvertor oracle = new OracleDataTypeConvertor();
+ JdbcConnectorException exception7 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> oracle.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported
operation] - Doesn't support ORACLE type 'UNSUPPORTED_TYPE' of the 'test' field
yet.",
+ exception7.getMessage());
+ JdbcConnectorException exception8 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> oracle.toSeaTunnelType("test",
"UNSUPPORTED_TYPE", new HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported
operation] - Doesn't support ORACLE type 'UNSUPPORTED_TYPE' of the 'test' field
yet.",
+ exception8.getMessage());
+ UnsupportedOperationException exception9 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> oracle.toConnectorType("test", multipleRowType,
new HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test'
field yet.",
+ exception9.getMessage());
+
+ PostgresDataTypeConvertor postgres = new PostgresDataTypeConvertor();
+ UnsupportedOperationException exception10 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> postgres.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "Doesn't support POSTGRES type 'UNSUPPORTED_TYPE' of the
'test' field yet.",
+ exception10.getMessage());
+ UnsupportedOperationException exception11 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ postgres.toSeaTunnelType(
+ "test", "UNSUPPORTED_TYPE", new
HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support POSTGRES type 'UNSUPPORTED_TYPE' of the
'test' field yet.",
+ exception11.getMessage());
+ UnsupportedOperationException exception12 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> postgres.toConnectorType("test",
multipleRowType, new HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test'
field yet.",
+ exception12.getMessage());
+
+ RedshiftDataTypeConvertor redshift = new RedshiftDataTypeConvertor();
+ UnsupportedOperationException exception13 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> redshift.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "Doesn't support REDSHIFT type 'UNSUPPORTED_TYPE' of the
'test' field yet.",
+ exception13.getMessage());
+ UnsupportedOperationException exception14 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ redshift.toSeaTunnelType(
+ "test", "UNSUPPORTED_TYPE", new
HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support REDSHIFT type 'UNSUPPORTED_TYPE' of the
'test' field yet.",
+ exception14.getMessage());
+ UnsupportedOperationException exception15 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> redshift.toConnectorType("test",
multipleRowType, new HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test'
field yet.",
+ exception15.getMessage());
+
+ SnowflakeDataTypeConvertor snowflake = new
SnowflakeDataTypeConvertor();
+ UnsupportedOperationException exception16 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> snowflake.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "Doesn't support SNOWFLAKE type 'UNSUPPORTED_TYPE' of the
'test' field yet.",
+ exception16.getMessage());
+ UnsupportedOperationException exception17 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ snowflake.toSeaTunnelType(
+ "test", "UNSUPPORTED_TYPE", new
HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support SNOWFLAKE type 'UNSUPPORTED_TYPE' of the
'test' field yet.",
+ exception17.getMessage());
+ UnsupportedOperationException exception18 =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> snowflake.toConnectorType("test",
multipleRowType, new HashMap<>()));
+ Assertions.assertEquals(
+ "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test'
field yet.",
+ exception18.getMessage());
+
+ SqlServerDataTypeConvertor sqlserver = new
SqlServerDataTypeConvertor();
+ JdbcConnectorException exception19 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> sqlserver.toSeaTunnelType("test", "unknown"));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported
operation] - Doesn't support SQLSERVER type 'UNKNOWN' of the 'test' field",
+ exception19.getMessage());
+ JdbcConnectorException exception20 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () ->
+ sqlserver.toSeaTunnelType(
+ "test", SqlServerType.UNKNOWN, new
HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported
operation] - Doesn't support SQLSERVER type 'UNKNOWN' of the 'test' field",
+ exception20.getMessage());
+ JdbcConnectorException exception21 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> sqlserver.toConnectorType("test",
multipleRowType, new HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Doesn't support SqlServer type 'MULTIPLE_ROW' of the 'test' field yet",
+ exception21.getMessage());
+
+ TiDBDataTypeConvertor tidb = new TiDBDataTypeConvertor();
+ DataTypeConvertException exception22 =
+ Assertions.assertThrows(
+ DataTypeConvertException.class,
+ () -> tidb.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+ exception22.getMessage());
+ DataTypeConvertException exception23 =
+ Assertions.assertThrows(
+ DataTypeConvertException.class,
+ () -> tidb.toSeaTunnelType("test", UNKNOWN, new
HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+ exception23.getMessage());
+ JdbcConnectorException exception24 =
+ Assertions.assertThrows(
+ JdbcConnectorException.class,
+ () -> tidb.toConnectorType("test", multipleRowType,
new HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - TiDB doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test' field
yet",
+ exception24.getMessage());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
index 45cb7ec52d..0a23a84bb7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -35,22 +35,23 @@ public class MysqlDataTypeConvertorTest {
@Test
public void toSeaTunnelTypeWithString() {
Assertions.assertEquals(
- new DecimalType(5, 2),
mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5,2)"));
+ new DecimalType(5, 2),
mysqlDataTypeConvertor.toSeaTunnelType("", "DECIMAL(5,2)"));
Assertions.assertEquals(
- new DecimalType(5, 0),
mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5)"));
+ new DecimalType(5, 0),
mysqlDataTypeConvertor.toSeaTunnelType("", "DECIMAL(5)"));
Assertions.assertEquals(
- new DecimalType(10, 0),
mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL"));
+ new DecimalType(10, 0),
mysqlDataTypeConvertor.toSeaTunnelType("", "DECIMAL"));
}
@Test
public void toSeaTunnelType() {
Assertions.assertEquals(
BasicType.VOID_TYPE,
- mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL,
Collections.emptyMap()));
+ mysqlDataTypeConvertor.toSeaTunnelType("", MysqlType.NULL,
Collections.emptyMap()));
Assertions.assertEquals(
BasicType.STRING_TYPE,
- mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR,
Collections.emptyMap()));
+ mysqlDataTypeConvertor.toSeaTunnelType(
+ "", MysqlType.VARCHAR, Collections.emptyMap()));
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
index e0e8cef8c4..acf7fa6b0e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
@@ -37,16 +37,16 @@ public class SnowflakeDataTypeConvertorTest {
Assertions.assertEquals(
BasicType.STRING_TYPE,
snowflakeDataTypeConvertor.toSeaTunnelType(
- SnowflakeType.TEXT.name(), Collections.emptyMap()));
+ "", SnowflakeType.TEXT.name(),
Collections.emptyMap()));
Assertions.assertEquals(
BasicType.STRING_TYPE,
snowflakeDataTypeConvertor.toSeaTunnelType(
- SnowflakeType.VARIANT.name(), Collections.emptyMap()));
+ "", SnowflakeType.VARIANT.name(),
Collections.emptyMap()));
Assertions.assertEquals(
BasicType.STRING_TYPE,
snowflakeDataTypeConvertor.toSeaTunnelType(
- SnowflakeType.OBJECT.name(), Collections.emptyMap()));
+ "", SnowflakeType.OBJECT.name(),
Collections.emptyMap()));
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
index 4e48f938aa..02cfbc1aa6 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -48,19 +48,21 @@ import java.util.Map;
public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo> {
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
if (connectorDataType.startsWith("MAP")) {
// MAP<key,value>
int i = connectorDataType.indexOf(",");
return new MapType(
- toSeaTunnelType(connectorDataType.substring(4, i)),
+ toSeaTunnelType(field, connectorDataType.substring(4, i)),
toSeaTunnelType(
+ field,
connectorDataType.substring(i + 1,
connectorDataType.length() - 1)));
}
if (connectorDataType.startsWith("ARRAY")) {
// ARRAY<element>
SeaTunnelDataType<?> seaTunnelType =
- toSeaTunnelType(connectorDataType.substring(6,
connectorDataType.length() - 1));
+ toSeaTunnelType(
+ field, connectorDataType.substring(6,
connectorDataType.length() - 1));
switch (seaTunnelType.getSqlType()) {
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
@@ -92,9 +94,9 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
String[] fieldNames = new String[entryArray.length];
SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType<?>[entryArray.length];
for (int i = 0; i < entryArray.length; i++) {
- String[] field = entryArray[i].split(":");
- fieldNames[i] = field[0];
- fieldTypes[i] = toSeaTunnelType(field[1]);
+ String[] fieldNameAndType = entryArray[i].split(":");
+ fieldNames[i] = fieldNameAndType[0];
+ fieldTypes[i] = toSeaTunnelType(fieldNameAndType[0],
fieldNameAndType[1]);
}
return new SeaTunnelRowType(fieldNames, fieldTypes);
}
@@ -142,21 +144,21 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
- "SeaTunnel type not support this type [%s]
now",
- connectorDataType));
+ "SeaTunnel type not support this type [%s] of
the [%s] field now",
+ connectorDataType, field));
}
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- TypeInfo connectorDataType, Map<String, Object> dataTypeProperties)
+ String field, TypeInfo connectorDataType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
switch (connectorDataType.getOdpsType()) {
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType;
return new MapType(
- toSeaTunnelType(mapTypeInfo.getKeyTypeInfo(),
dataTypeProperties),
- toSeaTunnelType(mapTypeInfo.getValueTypeInfo(),
dataTypeProperties));
+ toSeaTunnelType(field, mapTypeInfo.getKeyTypeInfo(),
dataTypeProperties),
+ toSeaTunnelType(field, mapTypeInfo.getValueTypeInfo(),
dataTypeProperties));
case ARRAY:
ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo)
connectorDataType;
switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
@@ -176,17 +178,17 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
- "SeaTunnel type not support this type
[%s] now",
- connectorDataType.getTypeName()));
+ "SeaTunnel type not support this type
[%s] of the [%s] field now",
+ connectorDataType.getTypeName(),
field));
}
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo)
connectorDataType;
List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos();
List<String> fieldNames = new ArrayList<>(fields.size());
List<SeaTunnelDataType<?>> fieldTypes = new
ArrayList<>(fields.size());
- for (TypeInfo field : fields) {
- fieldNames.add(field.getTypeName());
- fieldTypes.add(toSeaTunnelType(field, dataTypeProperties));
+ for (TypeInfo f : fields) {
+ fieldNames.add(f.getTypeName());
+ fieldTypes.add(toSeaTunnelType(f.getTypeName(), f,
dataTypeProperties));
}
return new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
@@ -228,32 +230,35 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
- "SeaTunnel type not support this type [%s]
now",
- connectorDataType.getTypeName()));
+ "SeaTunnel type not support this type [%s] of
the [%s] field now",
+ connectorDataType.getTypeName(), field));
}
}
@Override
public TypeInfo toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
switch (seaTunnelDataType.getSqlType()) {
case MAP:
MapType mapType = (MapType) seaTunnelDataType;
return TypeInfoFactory.getMapTypeInfo(
- toConnectorType(mapType.getKeyType(),
dataTypeProperties),
- toConnectorType(mapType.getValueType(),
dataTypeProperties));
+ toConnectorType(field, mapType.getKeyType(),
dataTypeProperties),
+ toConnectorType(field, mapType.getValueType(),
dataTypeProperties));
case ARRAY:
ArrayType arrayType = (ArrayType) seaTunnelDataType;
return TypeInfoFactory.getArrayTypeInfo(
- toConnectorType(arrayType.getElementType(),
dataTypeProperties));
+ toConnectorType(field, arrayType.getElementType(),
dataTypeProperties));
case ROW:
SeaTunnelRowType rowType = (SeaTunnelRowType)
seaTunnelDataType;
List<String> fieldNames = new
ArrayList<>(rowType.getTotalFields());
List<TypeInfo> fieldTypes = new
ArrayList<>(rowType.getTotalFields());
for (int i = 0; i < rowType.getTotalFields(); i++) {
fieldNames.add(rowType.getFieldName(i));
- fieldTypes.add(toConnectorType(rowType.getFieldType(i),
dataTypeProperties));
+ fieldTypes.add(
+ toConnectorType(field, rowType.getFieldType(i),
dataTypeProperties));
}
return TypeInfoFactory.getStructTypeInfo(fieldNames,
fieldTypes);
case TINYINT:
@@ -290,8 +295,8 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
- "Maxcompute type not support this type [%s]
now",
- seaTunnelDataType.getSqlType()));
+ "Maxcompute type not support this type [%s] of
the [%s] field now",
+ seaTunnelDataType.getSqlType(), field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index 8e9eaf0785..776bd32ecb 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -87,10 +87,11 @@ public class MaxcomputeTypeMapper implements Serializable {
try {
MaxComputeDataTypeConvertor typeConvertor = new
MaxComputeDataTypeConvertor();
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
- fieldNames.add(tableSchema.getColumns().get(i).getName());
+ String fieldName = tableSchema.getColumns().get(i).getName();
+ fieldNames.add(fieldName);
TypeInfo maxcomputeTypeInfo =
tableSchema.getColumns().get(i).getTypeInfo();
SeaTunnelDataType<?> seaTunnelDataType =
- typeConvertor.toSeaTunnelType(maxcomputeTypeInfo,
null);
+ typeConvertor.toSeaTunnelType(fieldName,
maxcomputeTypeInfo, null);
seaTunnelDataTypes.add(seaTunnelDataType);
}
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
index 0af30301ca..94a0e4f162 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
@@ -19,8 +19,11 @@ package
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -30,6 +33,10 @@ import com.aliyun.odps.type.MapTypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import com.aliyun.odps.type.VarcharTypeInfo;
+import java.util.HashMap;
+
+import static com.aliyun.odps.type.TypeInfoFactory.INTERVAL_DAY_TIME;
+
public class MaxComputeDataTypeConvertorTest {
private final MaxComputeDataTypeConvertor maxComputeDataTypeConvertor =
@@ -39,7 +46,7 @@ public class MaxComputeDataTypeConvertorTest {
public void testTypeInfoStrToSeaTunnelType() {
String typeInfoStr = "MAP<STRING,STRING>";
SeaTunnelDataType<?> seaTunnelType =
- maxComputeDataTypeConvertor.toSeaTunnelType(typeInfoStr);
+ maxComputeDataTypeConvertor.toSeaTunnelType("", typeInfoStr);
Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType)
seaTunnelType).getKeyType());
Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType)
seaTunnelType).getKeyType());
}
@@ -49,7 +56,7 @@ public class MaxComputeDataTypeConvertorTest {
MapTypeInfo simpleMapTypeInfo =
TypeInfoFactory.getMapTypeInfo(new VarcharTypeInfo(10), new
VarcharTypeInfo(10));
MapType seaTunnelMapType =
- (MapType)
maxComputeDataTypeConvertor.toSeaTunnelType(simpleMapTypeInfo, null);
+ (MapType) maxComputeDataTypeConvertor.toSeaTunnelType("",
simpleMapTypeInfo, null);
Assertions.assertEquals(BasicType.STRING_TYPE,
seaTunnelMapType.getKeyType());
Assertions.assertEquals(BasicType.STRING_TYPE,
seaTunnelMapType.getValueType());
}
@@ -58,7 +65,7 @@ public class MaxComputeDataTypeConvertorTest {
public void testSeaTunnelTypeToTypeInfo() {
MapType mapType = new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE);
MapTypeInfo mapTypeInfo =
- (MapTypeInfo)
maxComputeDataTypeConvertor.toConnectorType(mapType, null);
+ (MapTypeInfo) maxComputeDataTypeConvertor.toConnectorType("",
mapType, null);
Assertions.assertEquals(OdpsType.STRING,
mapTypeInfo.getKeyTypeInfo().getOdpsType());
Assertions.assertEquals(OdpsType.STRING,
mapTypeInfo.getValueTypeInfo().getOdpsType());
}
@@ -68,4 +75,35 @@ public class MaxComputeDataTypeConvertorTest {
Assertions.assertEquals(
MaxcomputeConfig.PLUGIN_NAME,
maxComputeDataTypeConvertor.getIdentity());
}
+
+ @Test
+ public void testConvertorErrorMsgWithUnsupportedType() {
+ SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new
SeaTunnelDataType[0]);
+ MultipleRowType multipleRowType =
+ new MultipleRowType(new String[] {"table"}, new
SeaTunnelRowType[] {rowType});
+ MaxComputeDataTypeConvertor maxCompute = new
MaxComputeDataTypeConvertor();
+ MaxcomputeConnectorException exception =
+ Assertions.assertThrows(
+ MaxcomputeConnectorException.class,
+ () -> maxCompute.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - SeaTunnel type not support this type [UNSUPPORTED_TYPE] of the [test]
field now",
+ exception.getMessage());
+ MaxcomputeConnectorException exception2 =
+ Assertions.assertThrows(
+ MaxcomputeConnectorException.class,
+ () ->
+ maxCompute.toSeaTunnelType(
+ "test", INTERVAL_DAY_TIME, new
HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - SeaTunnel type not support this type [INTERVAL_DAY_TIME] of the [test]
field now",
+ exception2.getMessage());
+ MaxcomputeConnectorException exception3 =
+ Assertions.assertThrows(
+ MaxcomputeConnectorException.class,
+ () -> maxCompute.toConnectorType("test",
multipleRowType, new HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Maxcompute type not support this type [MULTIPLE_ROW] of the [test]
field now",
+ exception3.getMessage());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
index eeda2d22a6..ca7f50e500 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
@@ -49,7 +49,7 @@ public class StarRocksDataTypeConvertor implements
DataTypeConvertor<MysqlType>
public static final Integer DEFAULT_SCALE = 0;
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
checkNotNull(connectorDataType, "connectorDataType can not be null");
MysqlType mysqlType = MysqlType.getByName(connectorDataType);
Map<String, Object> dataTypeProperties;
@@ -78,14 +78,14 @@ public class StarRocksDataTypeConvertor implements
DataTypeConvertor<MysqlType>
dataTypeProperties = Collections.emptyMap();
break;
}
- return toSeaTunnelType(mysqlType, dataTypeProperties);
+ return toSeaTunnelType(field, mysqlType, dataTypeProperties);
}
// todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType
doesn't contains
// properties.
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
- MysqlType mysqlType, Map<String, Object> dataTypeProperties)
+ String field, MysqlType mysqlType, Map<String, Object>
dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(mysqlType, "mysqlType can not be null");
@@ -148,13 +148,16 @@ public class StarRocksDataTypeConvertor implements
DataTypeConvertor<MysqlType>
return new DecimalType(precision, scale);
// TODO: support 'SET' & 'YEAR' type
default:
- throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+ throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(
+ field, mysqlType);
}
}
@Override
public MysqlType toConnectorType(
- SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ String field,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
SqlType sqlType = seaTunnelDataType.getSqlType();
// todo: verify
@@ -193,7 +196,9 @@ public class StarRocksDataTypeConvertor implements
DataTypeConvertor<MysqlType>
default:
throw new StarRocksConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- String.format("Doesn't support type '%s' yet",
sqlType));
+ String.format(
+ "Doris doesn't support type '%s' of the '%s'
field yet",
+ sqlType, field));
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/DataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/DataTypeConvertorTest.java
new file mode 100644
index 0000000000..6521ca3b39
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/DataTypeConvertorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static com.mysql.cj.MysqlType.UNKNOWN;
+
+public class DataTypeConvertorTest {
+
+ @Test
+ void testConvertorErrorMsgWithUnsupportedType() {
+ SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new
SeaTunnelDataType[0]);
+ MultipleRowType multipleRowType =
+ new MultipleRowType(new String[] {"table"}, new
SeaTunnelRowType[] {rowType});
+ StarRocksDataTypeConvertor starrocks = new
StarRocksDataTypeConvertor();
+ DataTypeConvertException exception =
+ Assertions.assertThrows(
+ DataTypeConvertException.class,
+ () -> starrocks.toSeaTunnelType("test",
"UNSUPPORTED_TYPE"));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+ exception.getMessage());
+ DataTypeConvertException exception2 =
+ Assertions.assertThrows(
+ DataTypeConvertException.class,
+ () -> starrocks.toSeaTunnelType("test", UNKNOWN, new
HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+ exception2.getMessage());
+ StarRocksConnectorException exception3 =
+ Assertions.assertThrows(
+ StarRocksConnectorException.class,
+ () -> starrocks.toConnectorType("test",
multipleRowType, new HashMap<>()));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data
type] - Doris doesn't support type 'MULTIPLE_ROW' of the 'test' field yet",
+ exception3.getMessage());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
rename to
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
index 5942e21d3b..d692d85999 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.starrocks;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
similarity index 99%
rename from
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java
rename to
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index b571deb68a..6298921c2d 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.starrocks;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
index 9d966ffe2f..4e50f3ec20 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
@@ -110,7 +110,7 @@ public class KubernetesIT {
appsV1Api.createNamespacedStatefulSet(
namespace, yamlStatefulSet, null, null, null, null);
Awaitility.await()
- .atMost(30, TimeUnit.SECONDS)
+ .atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() -> {
V1StatefulSet v1StatefulSet =