This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6cf6e41da7 [Fix][Connector-V2] Field information lost during Paimon 
DataType and SeaTunnel Column conversion (#6767)
6cf6e41da7 is described below

commit 6cf6e41da72dc7bdcad231a1c297d3cf771fbf9c
Author: xiaochen <[email protected]>
AuthorDate: Thu May 23 09:20:47 2024 +0800

    [Fix][Connector-V2] Field information lost during Paimon DataType and 
SeaTunnel Column conversion (#6767)
---
 .../seatunnel/common/exception/CommonError.java    |  20 ++
 .../common/exception/CommonErrorCode.java          |   8 +-
 .../seatunnel/paimon/catalog/PaimonCatalog.java    |   9 +-
 .../seatunnel/paimon/config/PaimonConfig.java      |   2 +
 .../seatunnel/paimon/data/PaimonTypeMapper.java    |  13 +-
 .../seatunnel/paimon/utils/RowConverter.java       |  87 +++++----
 .../seatunnel/paimon/utils/RowKindConverter.java   |  12 +-
 .../seatunnel/paimon/utils/RowTypeConverter.java   | 205 ++++++++++++++++-----
 .../seatunnel/paimon/utils/SchemaUtil.java         |   8 +-
 .../paimon/utils/RowTypeConverterTest.java         |  61 +++++-
 10 files changed, 320 insertions(+), 105 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index 9843774d0c..f283963347 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -39,8 +39,10 @@ import static 
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.OPERATION_NOT_SUPPORTED;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ARRAY_GENERIC_TYPE;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
 
@@ -225,4 +227,22 @@ public class CommonError {
         params.put("optionName", optionName);
         return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR, 
params);
     }
+
+    public static SeaTunnelRuntimeException unsupportedArrayGenericType(
+            String identifier, String dataType, String fieldName) {
+        Map<String, String> params = new HashMap<>();
+        params.put("identifier", identifier);
+        params.put("dataType", dataType);
+        params.put("fieldName", fieldName);
+        return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE, 
params);
+    }
+
+    public static SeaTunnelRuntimeException unsupportedRowKind(
+            String identifier, String tableId, String rowKind) {
+        Map<String, String> params = new HashMap<>();
+        params.put("identifier", identifier);
+        params.put("tableId", tableId);
+        params.put("rowKind", rowKind);
+        return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
+    }
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 830e651f80..3cf69285cb 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -55,10 +55,14 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
             "The table of <tableName> has no <keyName>, but the template \n 
<template> \n which has the place holder named <placeholder>. Please use the 
option named <optionName> to specify sql template"),
 
     VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is 
unsupported."),
-
     OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is 
unsupported."),
     CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
-            "COMMON-27", "The props named '<props>' of '<connector>' is 
blank.");
+            "COMMON-27", "The props named '<props>' of '<connector>' is 
blank."),
+    UNSUPPORTED_ARRAY_GENERIC_TYPE(
+            "COMMON-28",
+            "'<identifier>' array type not support genericType '<genericType>' 
of '<fieldName>'"),
+    UNSUPPORTED_ROW_KIND(
+            "COMMON-29", "'<identifier>' table '<tableId>' not support rowKind 
 '<rowKind>'");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index fab64da52e..1e40090805 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
@@ -37,6 +38,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -184,7 +186,12 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
         TableSchema.Builder builder = TableSchema.builder();
         dataFields.forEach(
                 dataField -> {
-                    Column column = 
SchemaUtil.toSeaTunnelType(dataField.type());
+                    BasicTypeDefine.BasicTypeDefineBuilder<DataType> 
typeDefineBuilder =
+                            BasicTypeDefine.<DataType>builder()
+                                    .name(dataField.name())
+                                    .comment(dataField.description())
+                                    .nativeType(dataField.type());
+                    Column column = 
SchemaUtil.toSeaTunnelType(typeDefineBuilder.build());
                     builder.column(column);
                 });
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index 0f41402d03..caa5f1e72c 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -48,6 +48,8 @@ import static java.util.stream.Collectors.toList;
 @Getter
 public class PaimonConfig implements Serializable {
 
+    public static final String CONNECTOR_IDENTITY = "Paimon";
+
     public static final Option<String> WAREHOUSE =
             Options.key("warehouse")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
index cbf512f61d..7d7d6403e3 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
@@ -18,8 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.paimon.data;
 
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.converter.TypeConverter;
-import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
+import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
 
 import org.apache.paimon.types.DataType;
@@ -29,21 +30,21 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @AutoService(TypeConverter.class)
-public class PaimonTypeMapper implements TypeConverter<DataType> {
+public class PaimonTypeMapper implements 
TypeConverter<BasicTypeDefine<DataType>> {
     public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();
 
     @Override
     public String identifier() {
-        return PaimonSink.PLUGIN_NAME;
+        return PaimonConfig.CONNECTOR_IDENTITY;
     }
 
     @Override
-    public Column convert(DataType dataType) {
-        return RowTypeConverter.convert(dataType);
+    public Column convert(BasicTypeDefine<DataType> typeDefine) {
+        return RowTypeConverter.convert(typeDefine);
     }
 
     @Override
-    public DataType reconvert(Column column) {
+    public BasicTypeDefine<DataType> reconvert(Column column) {
         return RowTypeConverter.reconvert(column);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index cb45edac73..11d9b3b61a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
 
 import org.apache.paimon.data.BinaryArray;
 import org.apache.paimon.data.BinaryArrayWriter;
@@ -68,7 +68,8 @@ public class RowConverter {
      * @param dataType Data type of the array
      * @return SeaTunnel array object
      */
-    public static Object convert(InternalArray array, SeaTunnelDataType<?> 
dataType) {
+    public static Object convertArrayType(
+            String fieldName, InternalArray array, SeaTunnelDataType<?> 
dataType) {
         switch (dataType.getSqlType()) {
             case STRING:
                 String[] strings = new String[array.size()];
@@ -119,10 +120,10 @@ public class RowConverter {
                 }
                 return doubles;
             default:
-                String errorMsg =
-                        String.format("Array type not support this genericType 
[%s]", dataType);
-                throw new PaimonConnectorException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, 
errorMsg);
+                throw CommonError.unsupportedArrayGenericType(
+                        PaimonConfig.CONNECTOR_IDENTITY,
+                        dataType.getSqlType().toString(),
+                        fieldName);
         }
     }
 
@@ -133,7 +134,8 @@ public class RowConverter {
      * @param dataType SeaTunnel array data type
      * @return Paimon array object {@link BinaryArray}
      */
-    public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> 
dataType) {
+    public static BinaryArray reconvert(
+            String fieldName, Object array, SeaTunnelDataType<?> dataType) {
         int length = ((Object[]) array).length;
         BinaryArray binaryArray = new BinaryArray();
         BinaryArrayWriter binaryArrayWriter;
@@ -220,10 +222,10 @@ public class RowConverter {
                 }
                 break;
             default:
-                String errorMsg =
-                        String.format("Array type not support this genericType 
[%s]", dataType);
-                throw new PaimonConnectorException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, 
errorMsg);
+                throw CommonError.unsupportedArrayGenericType(
+                        PaimonConfig.CONNECTOR_IDENTITY,
+                        dataType.getSqlType().toString(),
+                        fieldName);
         }
         binaryArrayWriter.complete();
         return binaryArray;
@@ -245,6 +247,7 @@ public class RowConverter {
                 continue;
             }
             SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
+            String fieldName = seaTunnelRowType.getFieldName(i);
             switch (fieldType.getSqlType()) {
                 case TINYINT:
                     objects[i] = rowData.getByte(i);
@@ -265,12 +268,11 @@ public class RowConverter {
                     objects[i] = rowData.getDouble(i);
                     break;
                 case DECIMAL:
-                    SeaTunnelDataType<?> decimalType = 
seaTunnelRowType.getFieldType(i);
                     Decimal decimal =
                             rowData.getDecimal(
                                     i,
-                                    ((DecimalType) decimalType).getPrecision(),
-                                    ((DecimalType) decimalType).getScale());
+                                    ((DecimalType) fieldType).getPrecision(),
+                                    ((DecimalType) fieldType).getScale());
                     objects[i] = decimal.toBigDecimal();
                     break;
                 case STRING:
@@ -293,19 +295,21 @@ public class RowConverter {
                     objects[i] = timestamp.toLocalDateTime();
                     break;
                 case ARRAY:
-                    SeaTunnelDataType<?> arrayType = 
seaTunnelRowType.getFieldType(i);
-                    InternalArray array = rowData.getArray(i);
-                    objects[i] = convert(array, ((ArrayType<?, ?>) 
arrayType).getElementType());
+                    InternalArray paimonArray = rowData.getArray(i);
+                    ArrayType<?, ?> seatunnelArray = (ArrayType<?, ?>) 
fieldType;
+                    objects[i] =
+                            convertArrayType(
+                                    fieldName, paimonArray, 
seatunnelArray.getElementType());
                     break;
                 case MAP:
-                    SeaTunnelDataType<?> mapType = 
seaTunnelRowType.getFieldType(i);
+                    MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
                     InternalMap map = rowData.getMap(i);
                     InternalArray keyArray = map.keyArray();
                     InternalArray valueArray = map.valueArray();
-                    SeaTunnelDataType<?> keyType = ((MapType<?, ?>) 
mapType).getKeyType();
-                    SeaTunnelDataType<?> valueType = ((MapType<?, ?>) 
mapType).getValueType();
-                    Object[] key = (Object[]) convert(keyArray, keyType);
-                    Object[] value = (Object[]) convert(valueArray, valueType);
+                    SeaTunnelDataType<?> keyType = mapType.getKeyType();
+                    SeaTunnelDataType<?> valueType = mapType.getValueType();
+                    Object[] key = (Object[]) convertArrayType(fieldName, 
keyArray, keyType);
+                    Object[] value = (Object[]) convertArrayType(fieldName, 
valueArray, valueType);
                     Map<Object, Object> mapData = new HashMap<>();
                     for (int j = 0; j < key.length; j++) {
                         mapData.put(key[j], value[j]);
@@ -319,9 +323,10 @@ public class RowConverter {
                     objects[i] = convert(row, (SeaTunnelRowType) rowType);
                     break;
                 default:
-                    throw new PaimonConnectorException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            "SeaTunnel does not support this type");
+                    throw CommonError.unsupportedDataType(
+                            PaimonConfig.CONNECTOR_IDENTITY,
+                            fieldType.getSqlType().toString(),
+                            fieldName);
             }
         }
         return new SeaTunnelRow(objects);
@@ -343,6 +348,12 @@ public class RowConverter {
         // Convert SeaTunnel RowKind to Paimon RowKind
         org.apache.paimon.types.RowKind rowKind =
                 
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
+        if (rowKind == null) {
+            throw CommonError.unsupportedRowKind(
+                    PaimonConfig.CONNECTOR_IDENTITY,
+                    seaTunnelRow.getRowKind().shortString(),
+                    seaTunnelRow.getTableId());
+        }
         binaryRow.setRowKind(rowKind);
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
         for (int i = 0; i < fieldTypes.length; i++) {
@@ -351,6 +362,7 @@ public class RowConverter {
                 binaryWriter.setNullAt(i);
                 continue;
             }
+            String fieldName = seaTunnelRowType.getFieldName(i);
             switch (fieldTypes[i].getSqlType()) {
                 case TINYINT:
                     binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
@@ -396,7 +408,6 @@ public class RowConverter {
                             .setValue(binaryWriter, i, 
DateTimeUtils.toInternal(date));
                     break;
                 case TIMESTAMP:
-                    String fieldName = seaTunnelRowType.getFieldName(i);
                     DataField dataField = SchemaUtil.getDataField(fields, 
fieldName);
                     int precision = ((TimestampType) 
dataField.type()).getPrecision();
                     LocalDateTime datetime = (LocalDateTime) 
seaTunnelRow.getField(i);
@@ -407,26 +418,31 @@ public class RowConverter {
                     MapType<?, ?> mapType = (MapType<?, ?>) 
seaTunnelRowType.getFieldType(i);
                     SeaTunnelDataType<?> keyType = mapType.getKeyType();
                     SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    DataType paimonKeyType = 
RowTypeConverter.reconvert(keyType);
-                    DataType paimonValueType = 
RowTypeConverter.reconvert(valueType);
+                    DataType paimonKeyType = 
RowTypeConverter.reconvert(fieldName, keyType);
+                    DataType paimonValueType = 
RowTypeConverter.reconvert(fieldName, valueType);
                     Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
                     Object[] keys = field.keySet().toArray(new Object[0]);
                     Object[] values = field.values().toArray(new Object[0]);
                     binaryWriter.writeMap(
                             i,
                             BinaryMap.valueOf(
-                                    reconvert(keys, keyType), 
reconvert(values, valueType)),
+                                    reconvert(fieldName, keys, keyType),
+                                    reconvert(fieldName, values, valueType)),
                             new InternalMapSerializer(paimonKeyType, 
paimonValueType));
                     break;
                 case ARRAY:
                     ArrayType<?, ?> arrayType = (ArrayType<?, ?>) 
seaTunnelRowType.getFieldType(i);
                     BinaryArray paimonArray =
-                            reconvert(seaTunnelRow.getField(i), 
arrayType.getElementType());
+                            reconvert(
+                                    fieldName,
+                                    seaTunnelRow.getField(i),
+                                    arrayType.getElementType());
                     binaryWriter.writeArray(
                             i,
                             paimonArray,
                             new InternalArraySerializer(
-                                    
RowTypeConverter.reconvert(arrayType.getElementType())));
+                                    RowTypeConverter.reconvert(
+                                            fieldName, 
arrayType.getElementType())));
                     break;
                 case ROW:
                     SeaTunnelDataType<?> rowType = 
seaTunnelRowType.getFieldType(i);
@@ -438,9 +454,10 @@ public class RowConverter {
                     binaryWriter.writeRow(i, paimonRow, new 
InternalRowSerializer(paimonRowType));
                     break;
                 default:
-                    throw new PaimonConnectorException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            "Unsupported data type " + 
seaTunnelRowType.getFieldType(i));
+                    throw CommonError.unsupportedDataType(
+                            PaimonConfig.CONNECTOR_IDENTITY,
+                            
seaTunnelRowType.getFieldType(i).getSqlType().toString(),
+                            fieldName);
             }
         }
         return binaryRow;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
index ce6a172e43..adb77c637d 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
@@ -18,8 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
 
 import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 
 import org.apache.paimon.data.InternalRow;
 
@@ -28,12 +26,12 @@ public class RowKindConverter {
     /**
      * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link 
InternalRow}
      *
-     * @param seaTunnelRowInd
+     * @param seaTunnelRowKind The kind of change that a row describes in a 
changelog.
      * @return
      */
     public static org.apache.paimon.types.RowKind 
convertSeaTunnelRowKind2PaimonRowKind(
-            RowKind seaTunnelRowInd) {
-        switch (seaTunnelRowInd) {
+            RowKind seaTunnelRowKind) {
+        switch (seaTunnelRowKind) {
             case DELETE:
                 return org.apache.paimon.types.RowKind.DELETE;
             case UPDATE_AFTER:
@@ -43,9 +41,7 @@ public class RowKindConverter {
             case INSERT:
                 return org.apache.paimon.types.RowKind.INSERT;
             default:
-                throw new PaimonConnectorException(
-                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                        "Unsupported rowKind type " + 
seaTunnelRowInd.shortString());
+                return null;
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index f38d353142..b250fd21e9 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -19,13 +19,14 @@ package 
org.apache.seatunnel.connectors.seatunnel.paimon.utils;
 
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
 
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.ArrayType;
@@ -53,13 +54,17 @@ import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 
-import java.util.Arrays;
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.List;
 import java.util.Objects;
 
+@Slf4j
 /** The converter for converting {@link RowType} and {@link SeaTunnelRowType} 
*/
 public class RowTypeConverter {
 
+    private static String UNKNOWN_FIELD = "UNKNOWN";
+
     private RowTypeConverter() {}
 
     /**
@@ -80,11 +85,20 @@ public class RowTypeConverter {
     /**
      * Convert Paimon row type {@link DataType} to SeaTunnel row type {@link 
SeaTunnelDataType}
      *
-     * @param dataType Paimon data type
+     * @param typeDefine Paimon data type
      * @return SeaTunnel data type {@link SeaTunnelDataType}
      */
-    public static Column convert(DataType dataType) {
-        PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = 
PhysicalColumn.builder();
+    public static Column convert(BasicTypeDefine<DataType> typeDefine) {
+
+        PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder =
+                PhysicalColumn.builder()
+                        .name(typeDefine.getName())
+                        .sourceType(typeDefine.getColumnType())
+                        .nullable(typeDefine.isNullable())
+                        .defaultValue(typeDefine.getDefaultValue())
+                        .comment(typeDefine.getComment());
+
+        DataType dataType = typeDefine.getNativeType();
         SeaTunnelDataType<?> seaTunnelDataType;
         PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
                 PaimonToSeaTunnelTypeVisitor.INSTANCE;
@@ -156,6 +170,12 @@ public class RowTypeConverter {
                 break;
             case ARRAY:
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType);
+                if (seaTunnelDataType == null) {
+                    throw CommonError.unsupportedArrayGenericType(
+                            PaimonConfig.CONNECTOR_IDENTITY,
+                            dataType.getTypeRoot().toString(),
+                            typeDefine.getName());
+                }
                 break;
             case MAP:
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((MapType) dataType);
@@ -164,12 +184,10 @@ public class RowTypeConverter {
                 seaTunnelDataType = 
paimonToSeaTunnelTypeVisitor.visit((RowType) dataType);
                 break;
             default:
-                String errorMsg =
-                        String.format(
-                                "Paimon dataType not support this genericType 
[%s]",
-                                dataType.asSQLString());
-                throw new PaimonConnectorException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, 
errorMsg);
+                throw CommonError.unsupportedDataType(
+                        PaimonConfig.CONNECTOR_IDENTITY,
+                        dataType.asSQLString(),
+                        typeDefine.getName());
         }
         return physicalColumnBuilder.dataType(seaTunnelDataType).build();
     }
@@ -182,16 +200,15 @@ public class RowTypeConverter {
      */
     public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, 
TableSchema tableSchema) {
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        int totalFields = seaTunnelRowType.getTotalFields();
         List<DataField> fields = tableSchema.fields();
-        DataType[] dataTypes =
-                Arrays.stream(fieldTypes)
-                        .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
-                        .toArray(DataType[]::new);
-        DataField[] dataFields = new DataField[dataTypes.length];
-        for (int i = 0; i < dataTypes.length; i++) {
-            DataType dataType = dataTypes[i];
+        DataField[] dataFields = new DataField[totalFields];
+        for (int i = 0; i < totalFields; i++) {
+            String fieldName = fieldNames[i];
+            DataType dataType =
+                    SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldName, 
fieldTypes[i]);
             DataTypeRoot typeRoot = dataType.getTypeRoot();
-            String fieldName = seaTunnelRowType.getFieldName(i);
             if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
                     || 
typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
                 DataField dataField = SchemaUtil.getDataField(fields, 
fieldName);
@@ -209,18 +226,20 @@ public class RowTypeConverter {
      * @param column SeaTunnel data type {@link Column}
      * @return Paimon data type {@link DataType}
      */
-    public static DataType reconvert(Column column) {
+    public static BasicTypeDefine<DataType> reconvert(Column column) {
         return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column);
     }
 
     /**
-     * Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data 
type {@link DataType}
+     * Mapping SeaTunnel data type {@link SeaTunnelDataType} of fieldName to 
Paimon data type {@link
+     * DataType}
      *
+     * @param fieldName SeaTunnel field name
      * @param dataType SeaTunnel data type {@link SeaTunnelDataType}
      * @return Paimon data type {@link DataType}
      */
-    public static DataType reconvert(SeaTunnelDataType<?> dataType) {
-        return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(dataType);
+    public static DataType reconvert(String fieldName, SeaTunnelDataType<?> 
dataType) {
+        return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldName, 
dataType);
     }
 
     /**
@@ -234,22 +253,109 @@ public class RowTypeConverter {
 
         private SeaTunnelTypeToPaimonVisitor() {}
 
-        public DataType visit(Column column) {
+        public BasicTypeDefine<DataType> visit(Column column) {
+            BasicTypeDefine.BasicTypeDefineBuilder<DataType> builder =
+                    BasicTypeDefine.<DataType>builder()
+                            .name(column.getName())
+                            .nullable(column.isNullable())
+                            .comment(column.getComment())
+                            .defaultValue(column.getDefaultValue());
             SeaTunnelDataType<?> dataType = column.getDataType();
             Integer scale = column.getScale();
             switch (dataType.getSqlType()) {
                 case TIMESTAMP:
-                    return DataTypes.TIMESTAMP(
-                            Objects.isNull(scale) ? 
TimestampType.DEFAULT_PRECISION : scale);
+                    int timestampScale =
+                            Objects.isNull(scale) ? 
TimestampType.DEFAULT_PRECISION : scale;
+                    TimestampType timestampType = 
DataTypes.TIMESTAMP(timestampScale);
+                    builder.nativeType(timestampType);
+                    builder.dataType(timestampType.getTypeRoot().name());
+                    builder.columnType(timestampType.toString());
+                    builder.scale(timestampScale);
+                    builder.length(column.getColumnLength());
+                    return builder.build();
                 case TIME:
-                    return DataTypes.TIME(
-                            Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION 
: scale);
+                    int timeScale = Objects.isNull(scale) ? 
TimeType.DEFAULT_PRECISION : scale;
+                    TimeType timeType = DataTypes.TIME(timeScale);
+                    builder.nativeType(timeType);
+                    builder.columnType(timeType.toString());
+                    builder.dataType(timeType.getTypeRoot().name());
+                    builder.scale(timeScale);
+                    builder.length(column.getColumnLength());
+                    return builder.build();
+                case DECIMAL:
+                    org.apache.seatunnel.api.table.type.DecimalType 
seatunnelDecimalType =
+                            (org.apache.seatunnel.api.table.type.DecimalType) 
dataType;
+                    int precision = seatunnelDecimalType.getPrecision();
+                    scale = seatunnelDecimalType.getScale();
+                    if (precision <= 0) {
+                        precision = DecimalType.DEFAULT_PRECISION;
+                        scale = DecimalType.DEFAULT_SCALE;
+                        log.warn(
+                                "The decimal column {} type decimal({},{}) is 
out of range, "
+                                        + "which is precision less than 0, "
+                                        + "it will be converted to 
decimal({},{})",
+                                column.getName(),
+                                seatunnelDecimalType.getPrecision(),
+                                seatunnelDecimalType.getScale(),
+                                precision,
+                                scale);
+                    } else if (precision > DecimalType.MAX_PRECISION) {
+                        scale = (int) Math.max(0, scale - (precision - 
DecimalType.MAX_PRECISION));
+                        precision = DecimalType.MAX_PRECISION;
+                        log.warn(
+                                "The decimal column {} type decimal({},{}) is 
out of range, "
+                                        + "which exceeds the maximum precision 
of {}, "
+                                        + "it will be converted to 
decimal({},{})",
+                                column.getName(),
+                                seatunnelDecimalType.getPrecision(),
+                                seatunnelDecimalType.getScale(),
+                                DecimalType.MAX_PRECISION,
+                                precision,
+                                scale);
+                    }
+                    if (scale < 0) {
+                        scale = DecimalType.DEFAULT_SCALE;
+                        log.warn(
+                                "The decimal column {} type decimal({},{}) is 
out of range, "
+                                        + "which is scale less than 0, "
+                                        + "it will be converted to 
decimal({},{})",
+                                column.getName(),
+                                seatunnelDecimalType.getPrecision(),
+                                seatunnelDecimalType.getScale(),
+                                precision,
+                                scale);
+                    } else if (scale > DecimalType.MAX_PRECISION) {
+                        scale = DecimalType.MAX_PRECISION;
+                        log.warn(
+                                "The decimal column {} type decimal({},{}) is 
out of range, "
+                                        + "which exceeds the maximum scale of 
{}, "
+                                        + "it will be converted to 
decimal({},{})",
+                                column.getName(),
+                                seatunnelDecimalType.getPrecision(),
+                                seatunnelDecimalType.getScale(),
+                                DecimalType.MAX_PRECISION,
+                                precision,
+                                scale);
+                    }
+
+                    DecimalType paimonDecimalType = 
DataTypes.DECIMAL(precision, scale);
+                    builder.nativeType(paimonDecimalType);
+                    builder.columnType(paimonDecimalType.toString());
+                    builder.dataType(paimonDecimalType.getTypeRoot().name());
+                    builder.scale(scale);
+                    builder.precision((long) precision);
+                    builder.length(column.getColumnLength());
+                    return builder.build();
                 default:
-                    return visit(dataType);
+                    builder.nativeType(visit(column.getName(), dataType));
+                    builder.columnType(dataType.toString());
+                    builder.length(column.getColumnLength());
+                    builder.dataType(dataType.getSqlType().name());
+                    return builder.build();
             }
         }
 
-        public DataType visit(SeaTunnelDataType<?> dataType) {
+        public DataType visit(String fieldName, SeaTunnelDataType<?> dataType) 
{
             switch (dataType.getSqlType()) {
                 case TINYINT:
                     return DataTypes.TINYINT();
@@ -288,22 +394,29 @@ public class RowTypeConverter {
                     SeaTunnelDataType<?> valueType =
                             ((org.apache.seatunnel.api.table.type.MapType<?, 
?>) dataType)
                                     .getValueType();
-                    return DataTypes.MAP(visit(keyType), visit(valueType));
+                    return DataTypes.MAP(visit(fieldName, keyType), 
visit(fieldName, valueType));
                 case ARRAY:
                     SeaTunnelDataType<?> elementType =
                             ((org.apache.seatunnel.api.table.type.ArrayType<?, 
?>) dataType)
                                     .getElementType();
-                    return DataTypes.ARRAY(visit(elementType));
+                    return DataTypes.ARRAY(visit(fieldName, elementType));
                 case ROW:
-                    SeaTunnelDataType<?>[] fieldTypes =
-                            ((SeaTunnelRowType) dataType).getFieldTypes();
-                    DataType[] dataTypes =
-                            
Arrays.stream(fieldTypes).map(this::visit).toArray(DataType[]::new);
+                    SeaTunnelRowType row = (SeaTunnelRowType) dataType;
+                    SeaTunnelDataType<?>[] fieldTypes = row.getFieldTypes();
+                    String[] fieldNames = row.getFieldNames();
+                    int totalFields = row.getTotalFields();
+                    DataType[] dataTypes = new DataType[totalFields];
+                    for (int i = 0; i < totalFields; i++) {
+                        dataTypes[i] =
+                                SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(
+                                        fieldNames[i], fieldTypes[i]);
+                    }
                     return DataTypes.ROW(dataTypes);
                 default:
-                    throw new PaimonConnectorException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            "Unsupported data type: " + dataType.getSqlType());
+                    throw CommonError.unsupportedDataType(
+                            PaimonConfig.CONNECTOR_IDENTITY,
+                            dataType.getSqlType().toString(),
+                            fieldName);
             }
         }
     }
@@ -417,12 +530,7 @@ public class RowTypeConverter {
                 case DOUBLE:
                     return 
org.apache.seatunnel.api.table.type.ArrayType.DOUBLE_ARRAY_TYPE;
                 default:
-                    String errorMsg =
-                            String.format(
-                                    "Array type not support this genericType 
[%s]",
-                                    seaTunnelArrayType);
-                    throw new PaimonConnectorException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, 
errorMsg);
+                    return null;
             }
         }
 
@@ -445,9 +553,8 @@ public class RowTypeConverter {
 
         @Override
         protected SeaTunnelDataType defaultMethod(DataType dataType) {
-            throw new PaimonConnectorException(
-                    CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                    "Unsupported data type: " + dataType);
+            throw CommonError.unsupportedDataType(
+                    PaimonConfig.CONNECTOR_IDENTITY, 
dataType.getTypeRoot().name(), UNKNOWN_FIELD);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index 65129dc8b7..0da047244f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.paimon.utils;
 
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
 
@@ -34,7 +35,8 @@ import java.util.Objects;
 public class SchemaUtil {
 
     public static DataType toPaimonType(Column column) {
-        return PaimonTypeMapper.INSTANCE.reconvert(column);
+        BasicTypeDefine<DataType> basicTypeDefine = 
PaimonTypeMapper.INSTANCE.reconvert(column);
+        return basicTypeDefine.getNativeType();
     }
 
     public static Schema toPaimonSchema(
@@ -62,8 +64,8 @@ public class SchemaUtil {
         return paiSchemaBuilder.build();
     }
 
-    public static Column toSeaTunnelType(DataType dataType) {
-        return PaimonTypeMapper.INSTANCE.convert(dataType);
+    public static Column toSeaTunnelType(BasicTypeDefine<DataType> typeDefine) 
{
+        return PaimonTypeMapper.INSTANCE.convert(typeDefine);
     }
 
     public static DataField getDataField(List<DataField> fields, String 
fieldName) {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index 5e614aeda5..e075efde15 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
 
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
@@ -46,6 +49,10 @@ public class RowTypeConverterTest {
 
     private RowType rowType;
 
+    private BasicTypeDefine<DataType> typeDefine;
+
+    private Column column;
+
     private TableSchema tableSchema;
 
     public static final RowType DEFAULT_ROW_TYPE =
@@ -148,10 +155,39 @@ public class RowTypeConverterTest {
                         KEY_NAME_LIST,
                         Collections.EMPTY_MAP,
                         "");
+
+        typeDefine =
+                BasicTypeDefine.<DataType>builder()
+                        .name("c_decimal")
+                        .comment("c_decimal_type_define")
+                        .columnType("DECIMAL(30, 8)")
+                        .nativeType(DataTypes.DECIMAL(30, 8))
+                        .dataType(DataTypes.DECIMAL(30, 8).toString())
+                        .length(30L)
+                        .precision(30L)
+                        .scale(8)
+                        .defaultValue(3.0)
+                        .nullable(false)
+                        .build();
+
+        org.apache.seatunnel.api.table.type.DecimalType dataType =
+                new org.apache.seatunnel.api.table.type.DecimalType(30, 8);
+
+        column =
+                PhysicalColumn.builder()
+                        .name("c_decimal")
+                        .sourceType(DataTypes.DECIMAL(30, 8).toString())
+                        .nullable(false)
+                        .dataType(dataType)
+                        .columnLength(30L)
+                        .defaultValue(3.0)
+                        .scale(8)
+                        .comment("c_decimal_type_define")
+                        .build();
     }
 
     @Test
-    public void paimonToSeaTunnel() {
+    public void paimonRowTypeToSeaTunnel() {
         SeaTunnelRowType convert = RowTypeConverter.convert(rowType);
         Assertions.assertEquals(convert, seaTunnelRowType);
     }
@@ -161,4 +197,27 @@ public class RowTypeConverterTest {
         RowType convert = RowTypeConverter.reconvert(seaTunnelRowType, 
tableSchema);
         Assertions.assertEquals(convert, rowType);
     }
+
+    @Test
+    public void paimonDataTypeToSeaTunnelColumn() {
+        Column column = RowTypeConverter.convert(typeDefine);
+        isEquals(column, typeDefine);
+    }
+
+    @Test
+    public void seaTunnelColumnToPaimonDataType() {
+        BasicTypeDefine<DataType> dataTypeDefine = 
RowTypeConverter.reconvert(column);
+        isEquals(column, dataTypeDefine);
+    }
+
+    private void isEquals(Column column, BasicTypeDefine<DataType> 
dataTypeDefine) {
+        Assertions.assertEquals(column.getComment(), 
dataTypeDefine.getComment());
+        Assertions.assertEquals(column.getColumnLength(), 
dataTypeDefine.getLength());
+        Assertions.assertEquals(column.getName(), dataTypeDefine.getName());
+        Assertions.assertEquals(column.isNullable(), 
dataTypeDefine.isNullable());
+        Assertions.assertEquals(column.getDefaultValue(), 
dataTypeDefine.getDefaultValue());
+        Assertions.assertEquals(column.getScale(), dataTypeDefine.getScale());
+        Assertions.assertTrue(
+                
column.getDataType().toString().equalsIgnoreCase(dataTypeDefine.getColumnType()));
+    }
 }


Reply via email to