This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ab60790f0d [Improve][Connector] Add field name to `DataTypeConvertor` 
to improve error message (#5782)
ab60790f0d is described below

commit ab60790f0d8600fe169d463b545137034d279301
Author: Jia Fan <[email protected]>
AuthorDate: Mon Nov 6 11:40:19 2023 +0800

    [Improve][Connector] Add field name to `DataTypeConvertor` to improve error 
message (#5782)
---
 .../table/catalog/DataTypeConvertException.java    |  27 +--
 .../api/table/catalog/DataTypeConvertor.java       |  11 +-
 .../doris/datatype/DorisDataTypeConvertor.java     |  18 +-
 .../doris/datatype/DataTypeConvertorTest.java      |  59 +++++
 .../catalog/ElasticSearchCatalog.java              |   3 +-
 .../catalog/ElasticSearchDataTypeConvertor.java    |  10 +-
 .../elasticsearch/source/ElasticsearchSource.java  |   2 +-
 .../seatunnel/jdbc/catalog/dm/DamengCatalog.java   |   8 +-
 .../jdbc/catalog/dm/DamengDataTypeConvertor.java   |  18 +-
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java |   7 +-
 .../catalog/mysql/MysqlCreateTableSqlBuilder.java  |   8 +-
 .../jdbc/catalog/mysql/MysqlDataTypeConvertor.java |  17 +-
 .../jdbc/catalog/oracle/OracleCatalog.java         |   8 +-
 .../oracle/OracleCreateTableSqlBuilder.java        |   4 +-
 .../catalog/oracle/OracleDataTypeConvertor.java    |  17 +-
 .../jdbc/catalog/psql/PostgresCatalog.java         |   7 +-
 .../psql/PostgresCreateTableSqlBuilder.java        |   4 +-
 .../catalog/psql/PostgresDataTypeConvertor.java    |  16 +-
 .../redshift/RedshiftDataTypeConvertor.java        |  16 +-
 .../snowflake/SnowflakeDataTypeConvertor.java      |  16 +-
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   |   7 +-
 .../sqlserver/SqlServerCreateTableSqlBuilder.java  |   3 +-
 .../sqlserver/SqlServerDataTypeConvertor.java      |  20 +-
 .../jdbc/catalog/tidb/TiDBDataTypeConvertor.java   |  17 +-
 .../jdbc/catalog/DataTypeConvertorTest.java        | 242 +++++++++++++++++++++
 .../jdbc/catalog/MysqlDataTypeConvertorTest.java   |  11 +-
 .../catalog/SnowflakeDataTypeConvertorTest.java    |   6 +-
 .../catalog/MaxComputeDataTypeConvertor.java       |  55 ++---
 .../maxcompute/util/MaxcomputeTypeMapper.java      |   5 +-
 .../catalog/MaxComputeDataTypeConvertorTest.java   |  44 +++-
 .../catalog/StarRocksDataTypeConvertor.java        |  17 +-
 .../starrocks/catalog/DataTypeConvertorTest.java   |  63 ++++++
 .../{ => catalog}/StarRocksCatalogTest.java        |   3 +-
 .../{ => catalog}/StarRocksCreateTableTest.java    |   2 +-
 .../seatunnel/engine/e2e/k8s/KubernetesIT.java     |   2 +-
 35 files changed, 618 insertions(+), 155 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
index 51f61dd6e4..c5649f548e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
@@ -17,16 +17,12 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 public class DataTypeConvertException extends SeaTunnelRuntimeException {
     private static final String CONVERT_TO_SEA_TUNNEL_ERROR_MSG =
-            "Convert type: %s to SeaTunnel data type error.";
-
-    private static final String CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG =
-            "Convert SeaTunnel data type: %s to connector data type error.";
+            "Convert type: %s of the %s field to SeaTunnel data type error.";
 
     public DataTypeConvertException(String message) {
         this(message, null);
@@ -36,26 +32,9 @@ public class DataTypeConvertException extends 
SeaTunnelRuntimeException {
         super(CommonErrorCode.UNSUPPORTED_DATA_TYPE, message, cause);
     }
 
-    public static DataTypeConvertException 
convertToSeaTunnelDataTypeException(Object dataType) {
-        return new DataTypeConvertException(
-                String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, dataType));
-    }
-
     public static DataTypeConvertException convertToSeaTunnelDataTypeException(
-            Object dataType, Throwable cause) {
-        return new DataTypeConvertException(
-                String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, dataType), 
cause);
-    }
-
-    public static DataTypeConvertException convertToConnectorDataTypeException(
-            SeaTunnelDataType<?> seaTunnelDataType) {
-        return new DataTypeConvertException(
-                String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG, 
seaTunnelDataType));
-    }
-
-    public static DataTypeConvertException convertToConnectorDataTypeException(
-            SeaTunnelDataType<?> seaTunnelDataType, Throwable cause) {
+            String field, Object dataType) {
         return new DataTypeConvertException(
-                String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG, 
seaTunnelDataType), cause);
+                String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, dataType, 
field));
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
index fb5a152dc3..31e4e818e8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -28,26 +28,29 @@ public interface DataTypeConvertor<T> {
     /**
      * Transfer the data type from connector to SeaTunnel.
      *
+     * @param field The field name of the column
      * @param connectorDataType e.g. "int", "varchar(255)"
      * @return the data type of SeaTunnel
      */
-    SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType);
+    SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType);
 
     /**
      * Transfer the data type from connector to SeaTunnel.
      *
+     * @param field The field name of the column
      * @param connectorDataType origin data type
      * @param dataTypeProperties origin data type properties, e.g. precision, 
scale, length
      * @return SeaTunnel data type
      */
     // todo: If the origin data type contains the properties, we can remove 
the dataTypeProperties.
     SeaTunnelDataType<?> toSeaTunnelType(
-            T connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, T connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException;
 
     /**
      * Transfer the data type from SeaTunnel to connector.
      *
+     * @param field The field name of the column
      * @param seaTunnelDataType seaTunnel data type
      * @param dataTypeProperties seaTunnel data type properties, e.g. 
precision, scale, length
      * @return origin data type
@@ -55,7 +58,9 @@ public interface DataTypeConvertor<T> {
     // todo: If the SeaTunnel data type contains the properties, we can remove 
the
     // dataTypeProperties.
     T toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException;
 
     String getIdentity();
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
index 7c9f08dfb7..40366ddd31 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
@@ -73,7 +73,7 @@ public class DorisDataTypeConvertor implements 
DataTypeConvertor<String> {
     public static final Integer DEFAULT_SCALE = 0;
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
         checkNotNull(connectorDataType, "connectorDataType can not be null");
         Map<String, Object> dataTypeProperties;
         switch (connectorDataType.toUpperCase(Locale.ROOT)) {
@@ -99,12 +99,12 @@ public class DorisDataTypeConvertor implements 
DataTypeConvertor<String> {
                 dataTypeProperties = Collections.emptyMap();
                 break;
         }
-        return toSeaTunnelType(connectorDataType, dataTypeProperties);
+        return toSeaTunnelType(field, connectorDataType, dataTypeProperties);
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            String connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, String connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(connectorDataType, "mysqlType can not be null");
         int precision;
@@ -145,13 +145,17 @@ public class DorisDataTypeConvertor implements 
DataTypeConvertor<String> {
                 return new DecimalType(precision, scale);
             default:
                 throw new UnsupportedOperationException(
-                        String.format("Doesn't support DORIS type '%s''  
yet.", connectorDataType));
+                        String.format(
+                                "Doesn't support Doris type '%s' of the '%s' 
field yet.",
+                                connectorDataType, field));
         }
     }
 
     @Override
     public String toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
         SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -189,7 +193,9 @@ public class DorisDataTypeConvertor implements 
DataTypeConvertor<String> {
                 return TIMESTAMP;
             default:
                 throw new UnsupportedOperationException(
-                        String.format("Doesn't support Doris type '%s''  
yet.", sqlType));
+                        String.format(
+                                "Doris doesn't support SeaTunnel type '%s' of 
the '%s' field yet.",
+                                sqlType, field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DataTypeConvertorTest.java
new file mode 100644
index 0000000000..9a0d46c11a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DataTypeConvertorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.datatype;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+public class DataTypeConvertorTest {
+
+    @Test
+    void testConvertorErrorMsgWithUnsupportedType() {
+        SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new 
SeaTunnelDataType[0]);
+        MultipleRowType multipleRowType =
+                new MultipleRowType(new String[] {"table"}, new 
SeaTunnelRowType[] {rowType});
+        DorisDataTypeConvertor doris = new DorisDataTypeConvertor();
+        UnsupportedOperationException exception =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> doris.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "Doesn't support Doris type 'UNSUPPORTED_TYPE' of the 'test' 
field yet.",
+                exception.getMessage());
+        UnsupportedOperationException exception2 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> doris.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE", new HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support Doris type 'UNSUPPORTED_TYPE' of the 'test' 
field yet.",
+                exception2.getMessage());
+        UnsupportedOperationException exception3 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> doris.toConnectorType("test", multipleRowType, 
new HashMap<>()));
+        Assertions.assertEquals(
+                "Doris doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 
'test' field yet.",
+                exception3.getMessage());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 3edaaa2846..8ca60925b0 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -148,7 +148,8 @@ public class ElasticSearchCatalog implements Catalog {
                     PhysicalColumn physicalColumn =
                             PhysicalColumn.of(
                                     fieldName,
-                                    
elasticSearchDataTypeConvertor.toSeaTunnelType(fieldType),
+                                    
elasticSearchDataTypeConvertor.toSeaTunnelType(
+                                            fieldName, fieldType),
                                     null,
                                     true,
                                     null,
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index a1bd50bbda..acd3d6aba9 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -47,13 +47,13 @@ public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<String>
     public static final String DATE = "date";
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
-        return toSeaTunnelType(connectorDataType, null);
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
+        return toSeaTunnelType(field, connectorDataType, null);
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            String connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, String connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(connectorDataType, "connectorDataType can not be null");
         switch (connectorDataType) {
@@ -88,7 +88,9 @@ public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<String>
 
     @Override
     public String toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType can not be null");
         SqlType sqlType = seaTunnelDataType.getSqlType();
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index e577695e9f..c7891745f5 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -79,7 +79,7 @@ public class ElasticsearchSource
             for (int i = 0; i < source.size(); i++) {
                 String esType = esFieldType.get(source.get(i));
                 SeaTunnelDataType seaTunnelDataType =
-                        elasticSearchDataTypeConvertor.toSeaTunnelType(esType);
+                        
elasticSearchDataTypeConvertor.toSeaTunnelType(source.get(i), esType);
                 fieldTypes[i] = seaTunnelDataType;
             }
             rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[0]), 
fieldTypes);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index a93a7ee560..edf3e1b380 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -122,7 +122,8 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         Object defaultValue = resultSet.getObject("DATA_DEFAULT");
         boolean isNullable = resultSet.getString("NULLABLE").equals("Y");
 
-        SeaTunnelDataType<?> type = fromJdbcType(typeName, columnPrecision, 
columnScale);
+        SeaTunnelDataType<?> type =
+                fromJdbcType(columnName, typeName, columnPrecision, 
columnScale);
 
         return PhysicalColumn.of(
                 columnName,
@@ -139,11 +140,12 @@ public class DamengCatalog extends AbstractJdbcCatalog {
                 columnLength);
     }
 
-    private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, 
long scale) {
+    private SeaTunnelDataType<?> fromJdbcType(
+            String columnName, String typeName, long precision, long scale) {
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(DamengDataTypeConvertor.PRECISION, precision);
         dataTypeProperties.put(DamengDataTypeConvertor.SCALE, scale);
-        return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, 
dataTypeProperties);
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, typeName, 
dataTypeProperties);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
index 1a26f95dc2..5cad2122ff 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengDataTypeConvertor.java
@@ -119,12 +119,13 @@ public class DamengDataTypeConvertor implements 
DataTypeConvertor<String> {
     }
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String dataType) {
-        return toSeaTunnelType(dataType, Collections.emptyMap());
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String dataType) 
{
+        return toSeaTunnelType(field, dataType, Collections.emptyMap());
     }
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String dataType, Map<String, 
Object> properties)
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            String field, String dataType, Map<String, Object> properties)
             throws DataTypeConvertException {
         switch (dataType.toUpperCase()) {
             case DM_BIT:
@@ -201,12 +202,15 @@ public class DamengDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new JdbcConnectorException(
                         CommonErrorCode.UNSUPPORTED_OPERATION,
-                        String.format("Doesn't support Dmdb type '%s' yet.", 
dataType));
+                        String.format(
+                                "Doesn't support DMDB type '%s' of the '%s' 
field yet.",
+                                dataType, field));
         }
     }
 
     @Override
-    public String toConnectorType(SeaTunnelDataType<?> dataType, Map<String, 
Object> properties)
+    public String toConnectorType(
+            String field, SeaTunnelDataType<?> dataType, Map<String, Object> 
properties)
             throws DataTypeConvertException {
         SqlType sqlType = dataType.getSqlType();
         switch (sqlType) {
@@ -238,7 +242,9 @@ public class DamengDataTypeConvertor implements 
DataTypeConvertor<String> {
                 return DM_BINARY;
             default:
                 throw new UnsupportedOperationException(
-                        String.format("Doesn't support SeaTunnel type '%s' 
yet.", dataType));
+                        String.format(
+                                "Doesn't support SeaTunnel type '%s' of the 
'%s' field yet.",
+                                dataType, field));
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 1b4a8f3425..6bb1274a80 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -129,7 +129,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         if (sourceType.toLowerCase(Locale.ROOT).contains("unsigned")) {
             typeName += "_UNSIGNED";
         }
-        SeaTunnelDataType<?> type = fromJdbcType(typeName, precision, scale);
+        SeaTunnelDataType<?> type = fromJdbcType(columnName, typeName, 
precision, scale);
         String comment = resultSet.getString("COLUMN_COMMENT");
         Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
         String isNullableStr = resultSet.getString("IS_NULLABLE");
@@ -198,12 +198,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         return String.format("DROP DATABASE `%s`;", databaseName);
     }
 
-    private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision, 
int scale) {
+    private SeaTunnelDataType<?> fromJdbcType(
+            String columnName, String typeName, int precision, int scale) {
         MysqlType mysqlType = MysqlType.getByName(typeName);
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, precision);
         dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, scale);
-        return DATA_TYPE_CONVERTOR.toSeaTunnelType(mysqlType, 
dataTypeProperties);
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, mysqlType, 
dataTypeProperties);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 5f44495a0a..41189ad2ac 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -213,14 +213,18 @@ public class MysqlCreateTableSqlBuilder {
         } else {
             // Column type
             final String name =
-                    
mysqlDataTypeConvertor.toConnectorType(column.getDataType(), null).getName();
+                    mysqlDataTypeConvertor
+                            .toConnectorType(column.getName(), 
column.getDataType(), null)
+                            .getName();
             if (columnLength == 0
                     && StringUtils.equalsIgnoreCase(name, 
MysqlType.VARCHAR.getName())) {
                 columnSqls.add(MysqlType.LONGTEXT.getName());
                 return;
             }
             columnSqls.add(
-                    
mysqlDataTypeConvertor.toConnectorType(column.getDataType(), null).getName());
+                    mysqlDataTypeConvertor
+                            .toConnectorType(column.getName(), 
column.getDataType(), null)
+                            .getName());
 
             String fieSql = "";
             List<String> list = new ArrayList<>();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
index f535b3580b..abe6fda112 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
@@ -51,7 +51,7 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
     public static final Integer DEFAULT_SCALE = 0;
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
         checkNotNull(connectorDataType, "connectorDataType can not be null");
         MysqlType mysqlType = MysqlType.getByName(connectorDataType);
         Map<String, Object> dataTypeProperties;
@@ -81,14 +81,14 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
                 dataTypeProperties = Collections.emptyMap();
                 break;
         }
-        return toSeaTunnelType(mysqlType, dataTypeProperties);
+        return toSeaTunnelType(field, mysqlType, dataTypeProperties);
     }
 
     // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType 
doesn't contains
     // properties.
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            MysqlType mysqlType, Map<String, Object> dataTypeProperties)
+            String field, MysqlType mysqlType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(mysqlType, "mysqlType can not be null");
         int precision;
@@ -158,13 +158,16 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
                 return new DecimalType(precision, scale);
                 // TODO: support 'SET' & 'YEAR' type
             default:
-                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(
+                        field, mysqlType);
         }
     }
 
     @Override
     public MysqlType toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         SqlType sqlType = seaTunnelDataType.getSqlType();
         // todo: verify
@@ -205,7 +208,9 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
             default:
                 throw new JdbcConnectorException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                        String.format("Doesn't support MySQL type '%s' yet", 
sqlType));
+                        String.format(
+                                "Doesn't support MySQL type '%s' of the '%s' 
field yet",
+                                sqlType, field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 0097a6008c..606b0a8f3f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -174,7 +174,8 @@ public class OracleCatalog extends AbstractJdbcCatalog {
         Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
         boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
 
-        SeaTunnelDataType<?> type = fromJdbcType(typeName, columnPrecision, 
columnScale);
+        SeaTunnelDataType<?> type =
+                fromJdbcType(columnName, typeName, columnPrecision, 
columnScale);
         long bitLen = 0;
         switch (typeName) {
             case ORACLE_LONG:
@@ -214,11 +215,12 @@ public class OracleCatalog extends AbstractJdbcCatalog {
                 columnLength);
     }
 
-    private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, 
long scale) {
+    private SeaTunnelDataType<?> fromJdbcType(
+            String columnName, String typeName, long precision, long scale) {
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(OracleDataTypeConvertor.PRECISION, precision);
         dataTypeProperties.put(OracleDataTypeConvertor.SCALE, scale);
-        return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, 
dataTypeProperties);
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, typeName, 
dataTypeProperties);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
index b10cdd93dc..99bcfedd6d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
@@ -122,7 +122,9 @@ public class OracleCreateTableSqlBuilder {
                     return "CLOB";
                 }
             default:
-                String type = 
oracleDataTypeConvertor.toConnectorType(column.getDataType(), null);
+                String type =
+                        oracleDataTypeConvertor.toConnectorType(
+                                column.getName(), column.getDataType(), null);
                 if (type.equals("NUMBER")) {
                     if (column.getDataType() instanceof DecimalType) {
                         DecimalType decimalType = (DecimalType) 
column.getDataType();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
index f290fa2688..24ca049ce9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleDataTypeConvertor.java
@@ -79,13 +79,13 @@ public class OracleDataTypeConvertor implements 
DataTypeConvertor<String> {
     public static final String ORACLE_LONG_RAW = "LONG RAW";
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
-        return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
+        return toSeaTunnelType(field, connectorDataType, 
Collections.emptyMap());
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            String connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, String connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(connectorDataType, "Oracle Type cannot be null");
         connectorDataType = normalizeTimestamp(connectorDataType);
@@ -142,13 +142,17 @@ public class OracleDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new JdbcConnectorException(
                         CommonErrorCode.UNSUPPORTED_OPERATION,
-                        String.format("Doesn't support ORACLE type '%s' yet.", 
connectorDataType));
+                        String.format(
+                                "Doesn't support ORACLE type '%s' of the '%s' 
field yet.",
+                                connectorDataType, field));
         }
     }
 
     @Override
     public String toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
         SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -178,7 +182,8 @@ public class OracleDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Doesn't support SeaTunnel type '%s' yet.", 
seaTunnelDataType));
+                                "Doesn't support SeaTunnel type '%s' of the 
'%s' field yet.",
+                                seaTunnelDataType.getSqlType(), field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index af89a6c6c1..0a447eb1e2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -145,7 +145,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
             defaultValue = null;
         }
 
-        SeaTunnelDataType<?> type = fromJdbcType(typeName, columnLength, 
columnScale);
+        SeaTunnelDataType<?> type = fromJdbcType(columnName, typeName, 
columnLength, columnScale);
         long bitLen = 0;
         switch (typeName) {
             case PG_BYTEA:
@@ -241,11 +241,12 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
         super.dropDatabaseInternal(databaseName);
     }
 
-    private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, 
long scale) {
+    private SeaTunnelDataType<?> fromJdbcType(
+            String columnName, String typeName, long precision, long scale) {
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(PostgresDataTypeConvertor.PRECISION, precision);
         dataTypeProperties.put(PostgresDataTypeConvertor.SCALE, scale);
-        return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, 
dataTypeProperties);
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, typeName, 
dataTypeProperties);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
index 43269e452c..52266f6a90 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
@@ -157,7 +157,9 @@ public class PostgresCreateTableSqlBuilder {
                     return "text";
                 }
             default:
-                String type = 
postgresDataTypeConvertor.toConnectorType(column.getDataType(), null);
+                String type =
+                        postgresDataTypeConvertor.toConnectorType(
+                                column.getName(), column.getDataType(), null);
                 if (type.equals(PG_NUMERIC)) {
                     DecimalType decimalType = (DecimalType) 
column.getDataType();
                     return "numeric("
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
index 2eaa87f862..1599bd84d6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
@@ -111,13 +111,13 @@ public class PostgresDataTypeConvertor implements 
DataTypeConvertor<String> {
     public static final String PG_XML = "xml";
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
-        return toSeaTunnelType(connectorDataType, new HashMap<>(0));
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
+        return toSeaTunnelType(field, connectorDataType, new HashMap<>(0));
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            String connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, String connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(connectorDataType, "Postgres Type cannot be null");
         switch (connectorDataType) {
@@ -189,13 +189,16 @@ public class PostgresDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Doesn't support POSTGRES type '%s''  yet.", 
connectorDataType));
+                                "Doesn't support POSTGRES type '%s' of the 
'%s' field yet.",
+                                connectorDataType, field));
         }
     }
 
     @Override
     public String toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
         SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -228,7 +231,8 @@ public class PostgresDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Doesn't support SeaTunnel type '%s''  yet.", 
seaTunnelDataType));
+                                "Doesn't support SeaTunnel type '%s' of the 
'%s' field yet.",
+                                seaTunnelDataType.getSqlType(), field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
index 9bcb52637a..db5984aeec 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -95,13 +95,13 @@ public class RedshiftDataTypeConvertor implements 
DataTypeConvertor<String> {
     private static final String REDSHIFT_TIMESTAMPTZ = "timestamptz";
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
-        return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
+        return toSeaTunnelType(field, connectorDataType, 
Collections.emptyMap());
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            String connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, String connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(connectorDataType, "redshiftType cannot be null");
         switch (connectorDataType) {
@@ -157,13 +157,16 @@ public class RedshiftDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Doesn't support REDSHIFT type '%s''  yet.", 
connectorDataType));
+                                "Doesn't support REDSHIFT type '%s' of the 
'%s' field yet.",
+                                connectorDataType, field));
         }
     }
 
     @Override
     public String toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
         SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -196,7 +199,8 @@ public class RedshiftDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Doesn't support SeaTunnel type '%s''  yet.", 
seaTunnelDataType));
+                                "Doesn't support SeaTunnel type '%s' of the 
'%s' field yet.",
+                                seaTunnelDataType.getSqlType(), field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
index 17dc5337fa..a6477a8e43 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
@@ -88,13 +88,13 @@ public class SnowflakeDataTypeConvertor implements 
DataTypeConvertor<String> {
     private static final String SNOWFLAKE_OBJECT = "OBJECT";
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
-        return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
+        return toSeaTunnelType(field, connectorDataType, 
Collections.emptyMap());
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            String connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, String connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(connectorDataType, "redshiftType cannot be null");
 
@@ -151,13 +151,16 @@ public class SnowflakeDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Doesn't support SNOWFLAKE type '%s' yet.", 
connectorDataType));
+                                "Doesn't support SNOWFLAKE type '%s' of the 
'%s' field yet.",
+                                connectorDataType, field));
         }
     }
 
     @Override
     public String toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
         SqlType sqlType = seaTunnelDataType.getSqlType();
@@ -191,7 +194,8 @@ public class SnowflakeDataTypeConvertor implements 
DataTypeConvertor<String> {
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Doesn't support SeaTunnel type '%s''  yet.", 
seaTunnelDataType));
+                                "Doesn't support SeaTunnel type '%s' of the 
'%s' field yet.",
+                                seaTunnelDataType.getSqlType(), field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 5c64dda104..478ef873c7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -105,7 +105,7 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         int precision = resultSet.getInt("precision");
         int scale = resultSet.getInt("scale");
         long columnLength = resultSet.getLong("max_length");
-        SeaTunnelDataType<?> type = fromJdbcType(sourceType, precision, scale);
+        SeaTunnelDataType<?> type = fromJdbcType(columnName, sourceType, 
precision, scale);
         String comment = resultSet.getString("comment");
         Object defaultValue = resultSet.getObject("default_value");
         if (defaultValue != null) {
@@ -178,12 +178,13 @@ public class SqlServerCatalog extends AbstractJdbcCatalog 
{
                 columnLength);
     }
 
-    private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision, 
int scale) {
+    private SeaTunnelDataType<?> fromJdbcType(
+            String columnName, String typeName, int precision, int scale) {
         Pair<SqlServerType, Map<String, Object>> pair = 
SqlServerType.parse(typeName);
         Map<String, Object> dataTypeProperties = new HashMap<>();
         dataTypeProperties.put(SqlServerDataTypeConvertor.PRECISION, 
precision);
         dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, scale);
-        return DATA_TYPE_CONVERTOR.toSeaTunnelType(pair.getLeft(), 
dataTypeProperties);
+        return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnName, pair.getLeft(), 
dataTypeProperties);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
index 3360f4514a..24601ba145 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
@@ -218,7 +218,8 @@ public class SqlServerCreateTableSqlBuilder {
             } else {
                 // Add column type
                 SqlServerType sqlServerType =
-                        
sqlServerDataTypeConvertor.toConnectorType(column.getDataType(), null);
+                        sqlServerDataTypeConvertor.toConnectorType(
+                                column.getName(), column.getDataType(), null);
                 String typeName = sqlServerType.getName();
                 String fieldSuffixSql = null;
                 tyNameDef = typeName;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
index 421c3a3de5..df9d956ce1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
@@ -44,15 +44,17 @@ public class SqlServerDataTypeConvertor implements 
DataTypeConvertor<SqlServerTy
     public static final Integer DEFAULT_SCALE = 0;
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(@NonNull String 
connectorDataType) {
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, @NonNull String 
connectorDataType) {
         Pair<SqlServerType, Map<String, Object>> sqlServerType =
                 SqlServerType.parse(connectorDataType);
-        return toSeaTunnelType(sqlServerType.getLeft(), 
sqlServerType.getRight());
+        return toSeaTunnelType(field, sqlServerType.getLeft(), 
sqlServerType.getRight());
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            @NonNull SqlServerType connectorDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            @NonNull SqlServerType connectorDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         switch (connectorDataType) {
             case BIT:
@@ -101,13 +103,17 @@ public class SqlServerDataTypeConvertor implements 
DataTypeConvertor<SqlServerTy
             default:
                 throw new JdbcConnectorException(
                         CommonErrorCode.UNSUPPORTED_OPERATION,
-                        String.format("Doesn't support SQLSERVER type '%s'", 
connectorDataType));
+                        String.format(
+                                "Doesn't support SQLSERVER type '%s' of the 
'%s' field",
+                                connectorDataType, field));
         }
     }
 
     @Override
     public SqlServerType toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         SqlType sqlType = seaTunnelDataType.getSqlType();
         switch (sqlType) {
@@ -140,7 +146,9 @@ public class SqlServerDataTypeConvertor implements 
DataTypeConvertor<SqlServerTy
             default:
                 throw new JdbcConnectorException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                        String.format("Doesn't support SqlServer type '%s' 
yet", sqlType));
+                        String.format(
+                                "Doesn't support SqlServer type '%s' of the 
'%s' field yet",
+                                sqlType, field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
index 18b7db3187..7937d3059a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBDataTypeConvertor.java
@@ -51,7 +51,7 @@ public class TiDBDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
     public static final Integer DEFAULT_SCALE = 0;
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
         checkNotNull(connectorDataType, "connectorDataType can not be null");
         MysqlType mysqlType = MysqlType.getByName(connectorDataType);
         Map<String, Object> dataTypeProperties;
@@ -80,14 +80,14 @@ public class TiDBDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
                 dataTypeProperties = Collections.emptyMap();
                 break;
         }
-        return toSeaTunnelType(mysqlType, dataTypeProperties);
+        return toSeaTunnelType(field, mysqlType, dataTypeProperties);
     }
 
     // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType 
doesn't contains
     // properties.
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            MysqlType mysqlType, Map<String, Object> dataTypeProperties)
+            String field, MysqlType mysqlType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(mysqlType, "mysqlType can not be null");
         int precision;
@@ -157,13 +157,16 @@ public class TiDBDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
                 return new DecimalType(precision, scale);
                 // TODO: support 'SET' & 'YEAR' type
             default:
-                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(
+                        field, mysqlType);
         }
     }
 
     @Override
     public MysqlType toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         SqlType sqlType = seaTunnelDataType.getSqlType();
         // todo: verify
@@ -201,7 +204,9 @@ public class TiDBDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
             default:
                 throw new JdbcConnectorException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                        String.format("Doesn't support TiDB type '%s' yet", 
sqlType));
+                        String.format(
+                                "TiDB doesn't support SeaTunnel type '%s' of 
the '%s' field yet",
+                                sqlType, field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/DataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/DataTypeConvertorTest.java
new file mode 100644
index 0000000000..4fa195e2df
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/DataTypeConvertorTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm.DamengDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift.RedshiftDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.snowflake.SnowflakeDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static com.mysql.cj.MysqlType.UNKNOWN;
+
+public class DataTypeConvertorTest {
+
+    @Test
+    void testConvertorErrorMsgWithUnsupportedType() {
+        SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new 
SeaTunnelDataType[0]);
+        MultipleRowType multipleRowType =
+                new MultipleRowType(new String[] {"table"}, new 
SeaTunnelRowType[] {rowType});
+
+        DamengDataTypeConvertor dameng = new DamengDataTypeConvertor();
+        JdbcConnectorException exception =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> dameng.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported 
operation] - Doesn't support DMDB type 'UNSUPPORTED_TYPE' of the 'test' field 
yet.",
+                exception.getMessage());
+        JdbcConnectorException exception2 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> dameng.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE", new HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported 
operation] - Doesn't support DMDB type 'UNSUPPORTED_TYPE' of the 'test' field 
yet.",
+                exception2.getMessage());
+        UnsupportedOperationException exception3 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> dameng.toConnectorType("test", rowType, new 
HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support SeaTunnel type 'ROW<>' of the 'test' field 
yet.",
+                exception3.getMessage());
+
+        MysqlDataTypeConvertor mysql = new MysqlDataTypeConvertor();
+        DataTypeConvertException exception4 =
+                Assertions.assertThrows(
+                        DataTypeConvertException.class,
+                        () -> mysql.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+                exception4.getMessage());
+        DataTypeConvertException exception5 =
+                Assertions.assertThrows(
+                        DataTypeConvertException.class,
+                        () -> mysql.toSeaTunnelType("test", UNKNOWN, new 
HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+                exception5.getMessage());
+        JdbcConnectorException exception6 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> mysql.toConnectorType("test", multipleRowType, 
new HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Doesn't support MySQL type 'MULTIPLE_ROW' of the 'test' field yet",
+                exception6.getMessage());
+
+        OracleDataTypeConvertor oracle = new OracleDataTypeConvertor();
+        JdbcConnectorException exception7 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> oracle.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported 
operation] - Doesn't support ORACLE type 'UNSUPPORTED_TYPE' of the 'test' field 
yet.",
+                exception7.getMessage());
+        JdbcConnectorException exception8 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> oracle.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE", new HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported 
operation] - Doesn't support ORACLE type 'UNSUPPORTED_TYPE' of the 'test' field 
yet.",
+                exception8.getMessage());
+        UnsupportedOperationException exception9 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> oracle.toConnectorType("test", multipleRowType, 
new HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test' 
field yet.",
+                exception9.getMessage());
+
+        PostgresDataTypeConvertor postgres = new PostgresDataTypeConvertor();
+        UnsupportedOperationException exception10 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> postgres.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "Doesn't support POSTGRES type 'UNSUPPORTED_TYPE' of the 
'test' field yet.",
+                exception10.getMessage());
+        UnsupportedOperationException exception11 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () ->
+                                postgres.toSeaTunnelType(
+                                        "test", "UNSUPPORTED_TYPE", new 
HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support POSTGRES type 'UNSUPPORTED_TYPE' of the 
'test' field yet.",
+                exception11.getMessage());
+        UnsupportedOperationException exception12 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> postgres.toConnectorType("test", 
multipleRowType, new HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test' 
field yet.",
+                exception12.getMessage());
+
+        RedshiftDataTypeConvertor redshift = new RedshiftDataTypeConvertor();
+        UnsupportedOperationException exception13 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> redshift.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "Doesn't support REDSHIFT type 'UNSUPPORTED_TYPE' of the 
'test' field yet.",
+                exception13.getMessage());
+        UnsupportedOperationException exception14 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () ->
+                                redshift.toSeaTunnelType(
+                                        "test", "UNSUPPORTED_TYPE", new 
HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support REDSHIFT type 'UNSUPPORTED_TYPE' of the 
'test' field yet.",
+                exception14.getMessage());
+        UnsupportedOperationException exception15 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> redshift.toConnectorType("test", 
multipleRowType, new HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test' 
field yet.",
+                exception15.getMessage());
+
+        SnowflakeDataTypeConvertor snowflake = new 
SnowflakeDataTypeConvertor();
+        UnsupportedOperationException exception16 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> snowflake.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "Doesn't support SNOWFLAKE type 'UNSUPPORTED_TYPE' of the 
'test' field yet.",
+                exception16.getMessage());
+        UnsupportedOperationException exception17 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () ->
+                                snowflake.toSeaTunnelType(
+                                        "test", "UNSUPPORTED_TYPE", new 
HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support SNOWFLAKE type 'UNSUPPORTED_TYPE' of the 
'test' field yet.",
+                exception17.getMessage());
+        UnsupportedOperationException exception18 =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> snowflake.toConnectorType("test", 
multipleRowType, new HashMap<>()));
+        Assertions.assertEquals(
+                "Doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test' 
field yet.",
+                exception18.getMessage());
+
+        SqlServerDataTypeConvertor sqlserver = new 
SqlServerDataTypeConvertor();
+        JdbcConnectorException exception19 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> sqlserver.toSeaTunnelType("test", "unknown"));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported 
operation] - Doesn't support SQLSERVER type 'UNKNOWN' of the 'test' field",
+                exception19.getMessage());
+        JdbcConnectorException exception20 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () ->
+                                sqlserver.toSeaTunnelType(
+                                        "test", SqlServerType.UNKNOWN, new 
HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-05], ErrorDescription:[Unsupported 
operation] - Doesn't support SQLSERVER type 'UNKNOWN' of the 'test' field",
+                exception20.getMessage());
+        JdbcConnectorException exception21 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> sqlserver.toConnectorType("test", 
multipleRowType, new HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Doesn't support SqlServer type 'MULTIPLE_ROW' of the 'test' field yet",
+                exception21.getMessage());
+
+        TiDBDataTypeConvertor tidb = new TiDBDataTypeConvertor();
+        DataTypeConvertException exception22 =
+                Assertions.assertThrows(
+                        DataTypeConvertException.class,
+                        () -> tidb.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+                exception22.getMessage());
+        DataTypeConvertException exception23 =
+                Assertions.assertThrows(
+                        DataTypeConvertException.class,
+                        () -> tidb.toSeaTunnelType("test", UNKNOWN, new 
HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+                exception23.getMessage());
+        JdbcConnectorException exception24 =
+                Assertions.assertThrows(
+                        JdbcConnectorException.class,
+                        () -> tidb.toConnectorType("test", multipleRowType, 
new HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - TiDB doesn't support SeaTunnel type 'MULTIPLE_ROW' of the 'test' field 
yet",
+                exception24.getMessage());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
index 45cb7ec52d..0a23a84bb7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -35,22 +35,23 @@ public class MysqlDataTypeConvertorTest {
     @Test
     public void toSeaTunnelTypeWithString() {
         Assertions.assertEquals(
-                new DecimalType(5, 2), 
mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5,2)"));
+                new DecimalType(5, 2), 
mysqlDataTypeConvertor.toSeaTunnelType("", "DECIMAL(5,2)"));
 
         Assertions.assertEquals(
-                new DecimalType(5, 0), 
mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5)"));
+                new DecimalType(5, 0), 
mysqlDataTypeConvertor.toSeaTunnelType("", "DECIMAL(5)"));
 
         Assertions.assertEquals(
-                new DecimalType(10, 0), 
mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL"));
+                new DecimalType(10, 0), 
mysqlDataTypeConvertor.toSeaTunnelType("", "DECIMAL"));
     }
 
     @Test
     public void toSeaTunnelType() {
         Assertions.assertEquals(
                 BasicType.VOID_TYPE,
-                mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, 
Collections.emptyMap()));
+                mysqlDataTypeConvertor.toSeaTunnelType("", MysqlType.NULL, 
Collections.emptyMap()));
         Assertions.assertEquals(
                 BasicType.STRING_TYPE,
-                mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR, 
Collections.emptyMap()));
+                mysqlDataTypeConvertor.toSeaTunnelType(
+                        "", MysqlType.VARCHAR, Collections.emptyMap()));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
index e0e8cef8c4..acf7fa6b0e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
@@ -37,16 +37,16 @@ public class SnowflakeDataTypeConvertorTest {
         Assertions.assertEquals(
                 BasicType.STRING_TYPE,
                 snowflakeDataTypeConvertor.toSeaTunnelType(
-                        SnowflakeType.TEXT.name(), Collections.emptyMap()));
+                        "", SnowflakeType.TEXT.name(), 
Collections.emptyMap()));
 
         Assertions.assertEquals(
                 BasicType.STRING_TYPE,
                 snowflakeDataTypeConvertor.toSeaTunnelType(
-                        SnowflakeType.VARIANT.name(), Collections.emptyMap()));
+                        "", SnowflakeType.VARIANT.name(), 
Collections.emptyMap()));
 
         Assertions.assertEquals(
                 BasicType.STRING_TYPE,
                 snowflakeDataTypeConvertor.toSeaTunnelType(
-                        SnowflakeType.OBJECT.name(), Collections.emptyMap()));
+                        "", SnowflakeType.OBJECT.name(), 
Collections.emptyMap()));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
index 4e48f938aa..02cfbc1aa6 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -48,19 +48,21 @@ import java.util.Map;
 public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo> {
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
         if (connectorDataType.startsWith("MAP")) {
             // MAP<key,value>
             int i = connectorDataType.indexOf(",");
             return new MapType(
-                    toSeaTunnelType(connectorDataType.substring(4, i)),
+                    toSeaTunnelType(field, connectorDataType.substring(4, i)),
                     toSeaTunnelType(
+                            field,
                             connectorDataType.substring(i + 1, 
connectorDataType.length() - 1)));
         }
         if (connectorDataType.startsWith("ARRAY")) {
             // ARRAY<element>
             SeaTunnelDataType<?> seaTunnelType =
-                    toSeaTunnelType(connectorDataType.substring(6, 
connectorDataType.length() - 1));
+                    toSeaTunnelType(
+                            field, connectorDataType.substring(6, 
connectorDataType.length() - 1));
             switch (seaTunnelType.getSqlType()) {
                 case STRING:
                     return ArrayType.STRING_ARRAY_TYPE;
@@ -92,9 +94,9 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
             String[] fieldNames = new String[entryArray.length];
             SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType<?>[entryArray.length];
             for (int i = 0; i < entryArray.length; i++) {
-                String[] field = entryArray[i].split(":");
-                fieldNames[i] = field[0];
-                fieldTypes[i] = toSeaTunnelType(field[1]);
+                String[] fieldNameAndType = entryArray[i].split(":");
+                fieldNames[i] = fieldNameAndType[0];
+                fieldTypes[i] = toSeaTunnelType(fieldNameAndType[0], 
fieldNameAndType[1]);
             }
             return new SeaTunnelRowType(fieldNames, fieldTypes);
         }
@@ -142,21 +144,21 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
                 throw new MaxcomputeConnectorException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                         String.format(
-                                "SeaTunnel type not support this type [%s] 
now",
-                                connectorDataType));
+                                "SeaTunnel type not support this type [%s] of 
the [%s] field now",
+                                connectorDataType, field));
         }
     }
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            TypeInfo connectorDataType, Map<String, Object> dataTypeProperties)
+            String field, TypeInfo connectorDataType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         switch (connectorDataType.getOdpsType()) {
             case MAP:
                 MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType;
                 return new MapType(
-                        toSeaTunnelType(mapTypeInfo.getKeyTypeInfo(), 
dataTypeProperties),
-                        toSeaTunnelType(mapTypeInfo.getValueTypeInfo(), 
dataTypeProperties));
+                        toSeaTunnelType(field, mapTypeInfo.getKeyTypeInfo(), 
dataTypeProperties),
+                        toSeaTunnelType(field, mapTypeInfo.getValueTypeInfo(), 
dataTypeProperties));
             case ARRAY:
                 ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) 
connectorDataType;
                 switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
@@ -176,17 +178,17 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
                         throw new MaxcomputeConnectorException(
                                 CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                                 String.format(
-                                        "SeaTunnel type not support this type 
[%s] now",
-                                        connectorDataType.getTypeName()));
+                                        "SeaTunnel type not support this type 
[%s] of the [%s] field now",
+                                        connectorDataType.getTypeName(), 
field));
                 }
             case STRUCT:
                 StructTypeInfo structTypeInfo = (StructTypeInfo) 
connectorDataType;
                 List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos();
                 List<String> fieldNames = new ArrayList<>(fields.size());
                 List<SeaTunnelDataType<?>> fieldTypes = new 
ArrayList<>(fields.size());
-                for (TypeInfo field : fields) {
-                    fieldNames.add(field.getTypeName());
-                    fieldTypes.add(toSeaTunnelType(field, dataTypeProperties));
+                for (TypeInfo f : fields) {
+                    fieldNames.add(f.getTypeName());
+                    fieldTypes.add(toSeaTunnelType(f.getTypeName(), f, 
dataTypeProperties));
                 }
                 return new SeaTunnelRowType(
                         fieldNames.toArray(new String[0]),
@@ -228,32 +230,35 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
                 throw new MaxcomputeConnectorException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                         String.format(
-                                "SeaTunnel type not support this type [%s] 
now",
-                                connectorDataType.getTypeName()));
+                                "SeaTunnel type not support this type [%s] of 
the [%s] field now",
+                                connectorDataType.getTypeName(), field));
         }
     }
 
     @Override
     public TypeInfo toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         switch (seaTunnelDataType.getSqlType()) {
             case MAP:
                 MapType mapType = (MapType) seaTunnelDataType;
                 return TypeInfoFactory.getMapTypeInfo(
-                        toConnectorType(mapType.getKeyType(), 
dataTypeProperties),
-                        toConnectorType(mapType.getValueType(), 
dataTypeProperties));
+                        toConnectorType(field, mapType.getKeyType(), 
dataTypeProperties),
+                        toConnectorType(field, mapType.getValueType(), 
dataTypeProperties));
             case ARRAY:
                 ArrayType arrayType = (ArrayType) seaTunnelDataType;
                 return TypeInfoFactory.getArrayTypeInfo(
-                        toConnectorType(arrayType.getElementType(), 
dataTypeProperties));
+                        toConnectorType(field, arrayType.getElementType(), 
dataTypeProperties));
             case ROW:
                 SeaTunnelRowType rowType = (SeaTunnelRowType) 
seaTunnelDataType;
                 List<String> fieldNames = new 
ArrayList<>(rowType.getTotalFields());
                 List<TypeInfo> fieldTypes = new 
ArrayList<>(rowType.getTotalFields());
                 for (int i = 0; i < rowType.getTotalFields(); i++) {
                     fieldNames.add(rowType.getFieldName(i));
-                    fieldTypes.add(toConnectorType(rowType.getFieldType(i), 
dataTypeProperties));
+                    fieldTypes.add(
+                            toConnectorType(field, rowType.getFieldType(i), 
dataTypeProperties));
                 }
                 return TypeInfoFactory.getStructTypeInfo(fieldNames, 
fieldTypes);
             case TINYINT:
@@ -290,8 +295,8 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
                 throw new MaxcomputeConnectorException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                         String.format(
-                                "Maxcompute type not support this type [%s] 
now",
-                                seaTunnelDataType.getSqlType()));
+                                "Maxcompute type not support this type [%s] of 
the [%s] field now",
+                                seaTunnelDataType.getSqlType(), field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index 8e9eaf0785..776bd32ecb 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -87,10 +87,11 @@ public class MaxcomputeTypeMapper implements Serializable {
         try {
             MaxComputeDataTypeConvertor typeConvertor = new 
MaxComputeDataTypeConvertor();
             for (int i = 0; i < tableSchema.getColumns().size(); i++) {
-                fieldNames.add(tableSchema.getColumns().get(i).getName());
+                String fieldName = tableSchema.getColumns().get(i).getName();
+                fieldNames.add(fieldName);
                 TypeInfo maxcomputeTypeInfo = 
tableSchema.getColumns().get(i).getTypeInfo();
                 SeaTunnelDataType<?> seaTunnelDataType =
-                        typeConvertor.toSeaTunnelType(maxcomputeTypeInfo, 
null);
+                        typeConvertor.toSeaTunnelType(fieldName, 
maxcomputeTypeInfo, null);
                 seaTunnelDataTypes.add(seaTunnelDataType);
             }
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
index 0af30301ca..94a0e4f162 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
@@ -19,8 +19,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
 
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -30,6 +33,10 @@ import com.aliyun.odps.type.MapTypeInfo;
 import com.aliyun.odps.type.TypeInfoFactory;
 import com.aliyun.odps.type.VarcharTypeInfo;
 
+import java.util.HashMap;
+
+import static com.aliyun.odps.type.TypeInfoFactory.INTERVAL_DAY_TIME;
+
 public class MaxComputeDataTypeConvertorTest {
 
     private final MaxComputeDataTypeConvertor maxComputeDataTypeConvertor =
@@ -39,7 +46,7 @@ public class MaxComputeDataTypeConvertorTest {
     public void testTypeInfoStrToSeaTunnelType() {
         String typeInfoStr = "MAP<STRING,STRING>";
         SeaTunnelDataType<?> seaTunnelType =
-                maxComputeDataTypeConvertor.toSeaTunnelType(typeInfoStr);
+                maxComputeDataTypeConvertor.toSeaTunnelType("", typeInfoStr);
         Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType) 
seaTunnelType).getKeyType());
         Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType) 
seaTunnelType).getKeyType());
     }
@@ -49,7 +56,7 @@ public class MaxComputeDataTypeConvertorTest {
         MapTypeInfo simpleMapTypeInfo =
                 TypeInfoFactory.getMapTypeInfo(new VarcharTypeInfo(10), new 
VarcharTypeInfo(10));
         MapType seaTunnelMapType =
-                (MapType) 
maxComputeDataTypeConvertor.toSeaTunnelType(simpleMapTypeInfo, null);
+                (MapType) maxComputeDataTypeConvertor.toSeaTunnelType("", 
simpleMapTypeInfo, null);
         Assertions.assertEquals(BasicType.STRING_TYPE, 
seaTunnelMapType.getKeyType());
         Assertions.assertEquals(BasicType.STRING_TYPE, 
seaTunnelMapType.getValueType());
     }
@@ -58,7 +65,7 @@ public class MaxComputeDataTypeConvertorTest {
     public void testSeaTunnelTypeToTypeInfo() {
         MapType mapType = new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE);
         MapTypeInfo mapTypeInfo =
-                (MapTypeInfo) 
maxComputeDataTypeConvertor.toConnectorType(mapType, null);
+                (MapTypeInfo) maxComputeDataTypeConvertor.toConnectorType("", 
mapType, null);
         Assertions.assertEquals(OdpsType.STRING, 
mapTypeInfo.getKeyTypeInfo().getOdpsType());
         Assertions.assertEquals(OdpsType.STRING, 
mapTypeInfo.getValueTypeInfo().getOdpsType());
     }
@@ -68,4 +75,35 @@ public class MaxComputeDataTypeConvertorTest {
         Assertions.assertEquals(
                 MaxcomputeConfig.PLUGIN_NAME, 
maxComputeDataTypeConvertor.getIdentity());
     }
+
+    @Test
+    public void testConvertorErrorMsgWithUnsupportedType() {
+        SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new 
SeaTunnelDataType[0]);
+        MultipleRowType multipleRowType =
+                new MultipleRowType(new String[] {"table"}, new 
SeaTunnelRowType[] {rowType});
+        MaxComputeDataTypeConvertor maxCompute = new 
MaxComputeDataTypeConvertor();
+        MaxcomputeConnectorException exception =
+                Assertions.assertThrows(
+                        MaxcomputeConnectorException.class,
+                        () -> maxCompute.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - SeaTunnel type not support this type [UNSUPPORTED_TYPE] of the [test] 
field now",
+                exception.getMessage());
+        MaxcomputeConnectorException exception2 =
+                Assertions.assertThrows(
+                        MaxcomputeConnectorException.class,
+                        () ->
+                                maxCompute.toSeaTunnelType(
+                                        "test", INTERVAL_DAY_TIME, new 
HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - SeaTunnel type not support this type [INTERVAL_DAY_TIME] of the [test] 
field now",
+                exception2.getMessage());
+        MaxcomputeConnectorException exception3 =
+                Assertions.assertThrows(
+                        MaxcomputeConnectorException.class,
+                        () -> maxCompute.toConnectorType("test", 
multipleRowType, new HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Maxcompute type not support this type [MULTIPLE_ROW] of the [test] 
field now",
+                exception3.getMessage());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
index eeda2d22a6..ca7f50e500 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
@@ -49,7 +49,7 @@ public class StarRocksDataTypeConvertor implements 
DataTypeConvertor<MysqlType>
     public static final Integer DEFAULT_SCALE = 0;
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+    public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
         checkNotNull(connectorDataType, "connectorDataType can not be null");
         MysqlType mysqlType = MysqlType.getByName(connectorDataType);
         Map<String, Object> dataTypeProperties;
@@ -78,14 +78,14 @@ public class StarRocksDataTypeConvertor implements 
DataTypeConvertor<MysqlType>
                 dataTypeProperties = Collections.emptyMap();
                 break;
         }
-        return toSeaTunnelType(mysqlType, dataTypeProperties);
+        return toSeaTunnelType(field, mysqlType, dataTypeProperties);
     }
 
     // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType 
doesn't contains
     // properties.
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(
-            MysqlType mysqlType, Map<String, Object> dataTypeProperties)
+            String field, MysqlType mysqlType, Map<String, Object> 
dataTypeProperties)
             throws DataTypeConvertException {
         checkNotNull(mysqlType, "mysqlType can not be null");
 
@@ -148,13 +148,16 @@ public class StarRocksDataTypeConvertor implements 
DataTypeConvertor<MysqlType>
                 return new DecimalType(precision, scale);
                 // TODO: support 'SET' & 'YEAR' type
             default:
-                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(
+                        field, mysqlType);
         }
     }
 
     @Override
     public MysqlType toConnectorType(
-            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            String field,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            Map<String, Object> dataTypeProperties)
             throws DataTypeConvertException {
         SqlType sqlType = seaTunnelDataType.getSqlType();
         // todo: verify
@@ -193,7 +196,9 @@ public class StarRocksDataTypeConvertor implements 
DataTypeConvertor<MysqlType>
             default:
                 throw new StarRocksConnectorException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                        String.format("Doesn't support type '%s' yet", 
sqlType));
+                        String.format(
+                                "Doris doesn't support type '%s' of the '%s' 
field yet",
+                                sqlType, field));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/DataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/DataTypeConvertorTest.java
new file mode 100644
index 0000000000..6521ca3b39
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/DataTypeConvertorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static com.mysql.cj.MysqlType.UNKNOWN;
+
+public class DataTypeConvertorTest {
+
+    @Test
+    void testConvertorErrorMsgWithUnsupportedType() {
+        SeaTunnelRowType rowType = new SeaTunnelRowType(new String[0], new 
SeaTunnelDataType[0]);
+        MultipleRowType multipleRowType =
+                new MultipleRowType(new String[] {"table"}, new 
SeaTunnelRowType[] {rowType});
+        StarRocksDataTypeConvertor starrocks = new 
StarRocksDataTypeConvertor();
+        DataTypeConvertException exception =
+                Assertions.assertThrows(
+                        DataTypeConvertException.class,
+                        () -> starrocks.toSeaTunnelType("test", 
"UNSUPPORTED_TYPE"));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+                exception.getMessage());
+        DataTypeConvertException exception2 =
+                Assertions.assertThrows(
+                        DataTypeConvertException.class,
+                        () -> starrocks.toSeaTunnelType("test", UNKNOWN, new 
HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Convert type: UNKNOWN of the test field to SeaTunnel data type error.",
+                exception2.getMessage());
+        StarRocksConnectorException exception3 =
+                Assertions.assertThrows(
+                        StarRocksConnectorException.class,
+                        () -> starrocks.toConnectorType("test", 
multipleRowType, new HashMap<>()));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data 
type] - Doris doesn't support type 'MULTIPLE_ROW' of the 'test' field yet",
+                exception3.getMessage());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
similarity index 96%
rename from 
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
rename to 
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
index 5942e21d3b..d692d85999 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.starrocks;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
 
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
similarity index 99%
rename from 
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java
rename to 
seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index b571deb68a..6298921c2d 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.starrocks;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
index 9d966ffe2f..4e50f3ec20 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
@@ -110,7 +110,7 @@ public class KubernetesIT {
             appsV1Api.createNamespacedStatefulSet(
                     namespace, yamlStatefulSet, null, null, null, null);
             Awaitility.await()
-                    .atMost(30, TimeUnit.SECONDS)
+                    .atMost(60, TimeUnit.SECONDS)
                     .untilAsserted(
                             () -> {
                                 V1StatefulSet v1StatefulSet =

Reply via email to