This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6cf6e41da7 [Fix][Connector-V2] Field information lost during Paimon
DataType and SeaTunnel Column conversion (#6767)
6cf6e41da7 is described below
commit 6cf6e41da72dc7bdcad231a1c297d3cf771fbf9c
Author: xiaochen <[email protected]>
AuthorDate: Thu May 23 09:20:47 2024 +0800
[Fix][Connector-V2] Field information lost during Paimon DataType and
SeaTunnel Column conversion (#6767)
---
.../seatunnel/common/exception/CommonError.java | 20 ++
.../common/exception/CommonErrorCode.java | 8 +-
.../seatunnel/paimon/catalog/PaimonCatalog.java | 9 +-
.../seatunnel/paimon/config/PaimonConfig.java | 2 +
.../seatunnel/paimon/data/PaimonTypeMapper.java | 13 +-
.../seatunnel/paimon/utils/RowConverter.java | 87 +++++----
.../seatunnel/paimon/utils/RowKindConverter.java | 12 +-
.../seatunnel/paimon/utils/RowTypeConverter.java | 205 ++++++++++++++++-----
.../seatunnel/paimon/utils/SchemaUtil.java | 8 +-
.../paimon/utils/RowTypeConverterTest.java | 61 +++++-
10 files changed, 320 insertions(+), 105 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index 9843774d0c..f283963347 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -39,8 +39,10 @@ import static
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_
import static
org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.OPERATION_NOT_SUPPORTED;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ARRAY_GENERIC_TYPE;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
@@ -225,4 +227,22 @@ public class CommonError {
params.put("optionName", optionName);
return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR,
params);
}
+
+ public static SeaTunnelRuntimeException unsupportedArrayGenericType(
+ String identifier, String dataType, String fieldName) {
+ Map<String, String> params = new HashMap<>();
+ params.put("identifier", identifier);
+ params.put("dataType", dataType);
+ params.put("fieldName", fieldName);
+ return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE,
params);
+ }
+
+ public static SeaTunnelRuntimeException unsupportedRowKind(
+ String identifier, String tableId, String rowKind) {
+ Map<String, String> params = new HashMap<>();
+ params.put("identifier", identifier);
+ params.put("tableId", tableId);
+ params.put("rowKind", rowKind);
+ return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
+ }
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 830e651f80..3cf69285cb 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -55,10 +55,14 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"The table of <tableName> has no <keyName>, but the template \n
<template> \n which has the place holder named <placeholder>. Please use the
option named <optionName> to specify sql template"),
VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is
unsupported."),
-
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is
unsupported."),
CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
- "COMMON-27", "The props named '<props>' of '<connector>' is
blank.");
+ "COMMON-27", "The props named '<props>' of '<connector>' is
blank."),
+ UNSUPPORTED_ARRAY_GENERIC_TYPE(
+ "COMMON-28",
+ "'<identifier>' array type not support genericType '<genericType>'
of '<fieldName>'"),
+ UNSUPPORTED_ROW_KIND(
+ "COMMON-29", "'<identifier>' table '<tableId>' not support rowKind
'<rowKind>'");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index fab64da52e..1e40090805 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
@@ -37,6 +38,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import lombok.extern.slf4j.Slf4j;
@@ -184,7 +186,12 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
- Column column =
SchemaUtil.toSeaTunnelType(dataField.type());
+ BasicTypeDefine.BasicTypeDefineBuilder<DataType>
typeDefineBuilder =
+ BasicTypeDefine.<DataType>builder()
+ .name(dataField.name())
+ .comment(dataField.description())
+ .nativeType(dataField.type());
+ Column column =
SchemaUtil.toSeaTunnelType(typeDefineBuilder.build());
builder.column(column);
});
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index 0f41402d03..caa5f1e72c 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -48,6 +48,8 @@ import static java.util.stream.Collectors.toList;
@Getter
public class PaimonConfig implements Serializable {
+ public static final String CONNECTOR_IDENTITY = "Paimon";
+
public static final Option<String> WAREHOUSE =
Options.key("warehouse")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
index cbf512f61d..7d7d6403e3 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
@@ -18,8 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.data;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
-import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
+import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
import org.apache.paimon.types.DataType;
@@ -29,21 +30,21 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@AutoService(TypeConverter.class)
-public class PaimonTypeMapper implements TypeConverter<DataType> {
+public class PaimonTypeMapper implements
TypeConverter<BasicTypeDefine<DataType>> {
public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();
@Override
public String identifier() {
- return PaimonSink.PLUGIN_NAME;
+ return PaimonConfig.CONNECTOR_IDENTITY;
}
@Override
- public Column convert(DataType dataType) {
- return RowTypeConverter.convert(dataType);
+ public Column convert(BasicTypeDefine<DataType> typeDefine) {
+ return RowTypeConverter.convert(typeDefine);
}
@Override
- public DataType reconvert(Column column) {
+ public BasicTypeDefine<DataType> reconvert(Column column) {
return RowTypeConverter.reconvert(column);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index cb45edac73..11d9b3b61a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryArrayWriter;
@@ -68,7 +68,8 @@ public class RowConverter {
* @param dataType Data type of the array
* @return SeaTunnel array object
*/
- public static Object convert(InternalArray array, SeaTunnelDataType<?>
dataType) {
+ public static Object convertArrayType(
+ String fieldName, InternalArray array, SeaTunnelDataType<?>
dataType) {
switch (dataType.getSqlType()) {
case STRING:
String[] strings = new String[array.size()];
@@ -119,10 +120,10 @@ public class RowConverter {
}
return doubles;
default:
- String errorMsg =
- String.format("Array type not support this genericType
[%s]", dataType);
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
+ throw CommonError.unsupportedArrayGenericType(
+ PaimonConfig.CONNECTOR_IDENTITY,
+ dataType.getSqlType().toString(),
+ fieldName);
}
}
@@ -133,7 +134,8 @@ public class RowConverter {
* @param dataType SeaTunnel array data type
* @return Paimon array object {@link BinaryArray}
*/
- public static BinaryArray reconvert(Object array, SeaTunnelDataType<?>
dataType) {
+ public static BinaryArray reconvert(
+ String fieldName, Object array, SeaTunnelDataType<?> dataType) {
int length = ((Object[]) array).length;
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter;
@@ -220,10 +222,10 @@ public class RowConverter {
}
break;
default:
- String errorMsg =
- String.format("Array type not support this genericType
[%s]", dataType);
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
+ throw CommonError.unsupportedArrayGenericType(
+ PaimonConfig.CONNECTOR_IDENTITY,
+ dataType.getSqlType().toString(),
+ fieldName);
}
binaryArrayWriter.complete();
return binaryArray;
@@ -245,6 +247,7 @@ public class RowConverter {
continue;
}
SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
+ String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldType.getSqlType()) {
case TINYINT:
objects[i] = rowData.getByte(i);
@@ -265,12 +268,11 @@ public class RowConverter {
objects[i] = rowData.getDouble(i);
break;
case DECIMAL:
- SeaTunnelDataType<?> decimalType =
seaTunnelRowType.getFieldType(i);
Decimal decimal =
rowData.getDecimal(
i,
- ((DecimalType) decimalType).getPrecision(),
- ((DecimalType) decimalType).getScale());
+ ((DecimalType) fieldType).getPrecision(),
+ ((DecimalType) fieldType).getScale());
objects[i] = decimal.toBigDecimal();
break;
case STRING:
@@ -293,19 +295,21 @@ public class RowConverter {
objects[i] = timestamp.toLocalDateTime();
break;
case ARRAY:
- SeaTunnelDataType<?> arrayType =
seaTunnelRowType.getFieldType(i);
- InternalArray array = rowData.getArray(i);
- objects[i] = convert(array, ((ArrayType<?, ?>)
arrayType).getElementType());
+ InternalArray paimonArray = rowData.getArray(i);
+ ArrayType<?, ?> seatunnelArray = (ArrayType<?, ?>)
fieldType;
+ objects[i] =
+ convertArrayType(
+ fieldName, paimonArray,
seatunnelArray.getElementType());
break;
case MAP:
- SeaTunnelDataType<?> mapType =
seaTunnelRowType.getFieldType(i);
+ MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
InternalMap map = rowData.getMap(i);
InternalArray keyArray = map.keyArray();
InternalArray valueArray = map.valueArray();
- SeaTunnelDataType<?> keyType = ((MapType<?, ?>)
mapType).getKeyType();
- SeaTunnelDataType<?> valueType = ((MapType<?, ?>)
mapType).getValueType();
- Object[] key = (Object[]) convert(keyArray, keyType);
- Object[] value = (Object[]) convert(valueArray, valueType);
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ Object[] key = (Object[]) convertArrayType(fieldName,
keyArray, keyType);
+ Object[] value = (Object[]) convertArrayType(fieldName,
valueArray, valueType);
Map<Object, Object> mapData = new HashMap<>();
for (int j = 0; j < key.length; j++) {
mapData.put(key[j], value[j]);
@@ -319,9 +323,10 @@ public class RowConverter {
objects[i] = convert(row, (SeaTunnelRowType) rowType);
break;
default:
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "SeaTunnel does not support this type");
+ throw CommonError.unsupportedDataType(
+ PaimonConfig.CONNECTOR_IDENTITY,
+ fieldType.getSqlType().toString(),
+ fieldName);
}
}
return new SeaTunnelRow(objects);
@@ -343,6 +348,12 @@ public class RowConverter {
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
+ if (rowKind == null) {
+ throw CommonError.unsupportedRowKind(
+ PaimonConfig.CONNECTOR_IDENTITY,
+ seaTunnelRow.getRowKind().shortString(),
+ seaTunnelRow.getTableId());
+ }
binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
@@ -351,6 +362,7 @@ public class RowConverter {
binaryWriter.setNullAt(i);
continue;
}
+ String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
@@ -396,7 +408,6 @@ public class RowConverter {
.setValue(binaryWriter, i,
DateTimeUtils.toInternal(date));
break;
case TIMESTAMP:
- String fieldName = seaTunnelRowType.getFieldName(i);
DataField dataField = SchemaUtil.getDataField(fields,
fieldName);
int precision = ((TimestampType)
dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime)
seaTunnelRow.getField(i);
@@ -407,26 +418,31 @@ public class RowConverter {
MapType<?, ?> mapType = (MapType<?, ?>)
seaTunnelRowType.getFieldType(i);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
- DataType paimonKeyType =
RowTypeConverter.reconvert(keyType);
- DataType paimonValueType =
RowTypeConverter.reconvert(valueType);
+ DataType paimonKeyType =
RowTypeConverter.reconvert(fieldName, keyType);
+ DataType paimonValueType =
RowTypeConverter.reconvert(fieldName, valueType);
Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
Object[] keys = field.keySet().toArray(new Object[0]);
Object[] values = field.values().toArray(new Object[0]);
binaryWriter.writeMap(
i,
BinaryMap.valueOf(
- reconvert(keys, keyType),
reconvert(values, valueType)),
+ reconvert(fieldName, keys, keyType),
+ reconvert(fieldName, values, valueType)),
new InternalMapSerializer(paimonKeyType,
paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>)
seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
- reconvert(seaTunnelRow.getField(i),
arrayType.getElementType());
+ reconvert(
+ fieldName,
+ seaTunnelRow.getField(i),
+ arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
new InternalArraySerializer(
-
RowTypeConverter.reconvert(arrayType.getElementType())));
+ RowTypeConverter.reconvert(
+ fieldName,
arrayType.getElementType())));
break;
case ROW:
SeaTunnelDataType<?> rowType =
seaTunnelRowType.getFieldType(i);
@@ -438,9 +454,10 @@ public class RowConverter {
binaryWriter.writeRow(i, paimonRow, new
InternalRowSerializer(paimonRowType));
break;
default:
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unsupported data type " +
seaTunnelRowType.getFieldType(i));
+ throw CommonError.unsupportedDataType(
+ PaimonConfig.CONNECTOR_IDENTITY,
+
seaTunnelRowType.getFieldType(i).getSqlType().toString(),
+ fieldName);
}
}
return binaryRow;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
index ce6a172e43..adb77c637d 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
@@ -18,8 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.paimon.data.InternalRow;
@@ -28,12 +26,12 @@ public class RowKindConverter {
/**
* Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link
InternalRow}
*
- * @param seaTunnelRowInd
+ * @param seaTunnelRowKind The kind of change that a row describes in a
changelog.
* @return
*/
public static org.apache.paimon.types.RowKind
convertSeaTunnelRowKind2PaimonRowKind(
- RowKind seaTunnelRowInd) {
- switch (seaTunnelRowInd) {
+ RowKind seaTunnelRowKind) {
+ switch (seaTunnelRowKind) {
case DELETE:
return org.apache.paimon.types.RowKind.DELETE;
case UPDATE_AFTER:
@@ -43,9 +41,7 @@ public class RowKindConverter {
case INSERT:
return org.apache.paimon.types.RowKind.INSERT;
default:
- throw new PaimonConnectorException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- "Unsupported rowKind type " +
seaTunnelRowInd.shortString());
+ return null;
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index f38d353142..b250fd21e9 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -19,13 +19,14 @@ package
org.apache.seatunnel.connectors.seatunnel.paimon.utils;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.ArrayType;
@@ -53,13 +54,17 @@ import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
-import java.util.Arrays;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.List;
import java.util.Objects;
+@Slf4j
/** The converter for converting {@link RowType} and {@link SeaTunnelRowType}
*/
public class RowTypeConverter {
+ private static String UNKNOWN_FIELD = "UNKNOWN";
+
private RowTypeConverter() {}
/**
@@ -80,11 +85,20 @@ public class RowTypeConverter {
/**
* Convert Paimon row type {@link DataType} to SeaTunnel row type {@link
SeaTunnelDataType}
*
- * @param dataType Paimon data type
+ * @param typeDefine Paimon data type
* @return SeaTunnel data type {@link SeaTunnelDataType}
*/
- public static Column convert(DataType dataType) {
- PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder =
PhysicalColumn.builder();
+ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
+
+ PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder =
+ PhysicalColumn.builder()
+ .name(typeDefine.getName())
+ .sourceType(typeDefine.getColumnType())
+ .nullable(typeDefine.isNullable())
+ .defaultValue(typeDefine.getDefaultValue())
+ .comment(typeDefine.getComment());
+
+ DataType dataType = typeDefine.getNativeType();
SeaTunnelDataType<?> seaTunnelDataType;
PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
PaimonToSeaTunnelTypeVisitor.INSTANCE;
@@ -156,6 +170,12 @@ public class RowTypeConverter {
break;
case ARRAY:
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType);
+ if (seaTunnelDataType == null) {
+ throw CommonError.unsupportedArrayGenericType(
+ PaimonConfig.CONNECTOR_IDENTITY,
+ dataType.getTypeRoot().toString(),
+ typeDefine.getName());
+ }
break;
case MAP:
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((MapType) dataType);
@@ -164,12 +184,10 @@ public class RowTypeConverter {
seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((RowType) dataType);
break;
default:
- String errorMsg =
- String.format(
- "Paimon dataType not support this genericType
[%s]",
- dataType.asSQLString());
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
+ throw CommonError.unsupportedDataType(
+ PaimonConfig.CONNECTOR_IDENTITY,
+ dataType.asSQLString(),
+ typeDefine.getName());
}
return physicalColumnBuilder.dataType(seaTunnelDataType).build();
}
@@ -182,16 +200,15 @@ public class RowTypeConverter {
*/
public static RowType reconvert(SeaTunnelRowType seaTunnelRowType,
TableSchema tableSchema) {
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ int totalFields = seaTunnelRowType.getTotalFields();
List<DataField> fields = tableSchema.fields();
- DataType[] dataTypes =
- Arrays.stream(fieldTypes)
- .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
- .toArray(DataType[]::new);
- DataField[] dataFields = new DataField[dataTypes.length];
- for (int i = 0; i < dataTypes.length; i++) {
- DataType dataType = dataTypes[i];
+ DataField[] dataFields = new DataField[totalFields];
+ for (int i = 0; i < totalFields; i++) {
+ String fieldName = fieldNames[i];
+ DataType dataType =
+ SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldName,
fieldTypes[i]);
DataTypeRoot typeRoot = dataType.getTypeRoot();
- String fieldName = seaTunnelRowType.getFieldName(i);
if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
||
typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
DataField dataField = SchemaUtil.getDataField(fields,
fieldName);
@@ -209,18 +226,20 @@ public class RowTypeConverter {
* @param column SeaTunnel data type {@link Column}
* @return Paimon data type {@link DataType}
*/
- public static DataType reconvert(Column column) {
+ public static BasicTypeDefine<DataType> reconvert(Column column) {
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column);
}
/**
- * Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data
type {@link DataType}
+ * Mapping SeaTunnel data type {@link SeaTunnelDataType} of fieldName to
Paimon data type {@link
+ * DataType}
*
+ * @param fieldName SeaTunnel field name
* @param dataType SeaTunnel data type {@link SeaTunnelDataType}
* @return Paimon data type {@link DataType}
*/
- public static DataType reconvert(SeaTunnelDataType<?> dataType) {
- return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(dataType);
+ public static DataType reconvert(String fieldName, SeaTunnelDataType<?>
dataType) {
+ return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldName,
dataType);
}
/**
@@ -234,22 +253,109 @@ public class RowTypeConverter {
private SeaTunnelTypeToPaimonVisitor() {}
- public DataType visit(Column column) {
+ public BasicTypeDefine<DataType> visit(Column column) {
+ BasicTypeDefine.BasicTypeDefineBuilder<DataType> builder =
+ BasicTypeDefine.<DataType>builder()
+ .name(column.getName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .defaultValue(column.getDefaultValue());
SeaTunnelDataType<?> dataType = column.getDataType();
Integer scale = column.getScale();
switch (dataType.getSqlType()) {
case TIMESTAMP:
- return DataTypes.TIMESTAMP(
- Objects.isNull(scale) ?
TimestampType.DEFAULT_PRECISION : scale);
+ int timestampScale =
+ Objects.isNull(scale) ?
TimestampType.DEFAULT_PRECISION : scale;
+ TimestampType timestampType =
DataTypes.TIMESTAMP(timestampScale);
+ builder.nativeType(timestampType);
+ builder.dataType(timestampType.getTypeRoot().name());
+ builder.columnType(timestampType.toString());
+ builder.scale(timestampScale);
+ builder.length(column.getColumnLength());
+ return builder.build();
case TIME:
- return DataTypes.TIME(
- Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION
: scale);
+ int timeScale = Objects.isNull(scale) ?
TimeType.DEFAULT_PRECISION : scale;
+ TimeType timeType = DataTypes.TIME(timeScale);
+ builder.nativeType(timeType);
+ builder.columnType(timeType.toString());
+ builder.dataType(timeType.getTypeRoot().name());
+ builder.scale(timeScale);
+ builder.length(column.getColumnLength());
+ return builder.build();
+ case DECIMAL:
+ org.apache.seatunnel.api.table.type.DecimalType
seatunnelDecimalType =
+ (org.apache.seatunnel.api.table.type.DecimalType)
dataType;
+ int precision = seatunnelDecimalType.getPrecision();
+ scale = seatunnelDecimalType.getScale();
+ if (precision <= 0) {
+ precision = DecimalType.DEFAULT_PRECISION;
+ scale = DecimalType.DEFAULT_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is
out of range, "
+ + "which is precision less than 0, "
+ + "it will be converted to
decimal({},{})",
+ column.getName(),
+ seatunnelDecimalType.getPrecision(),
+ seatunnelDecimalType.getScale(),
+ precision,
+ scale);
+ } else if (precision > DecimalType.MAX_PRECISION) {
+ scale = (int) Math.max(0, scale - (precision -
DecimalType.MAX_PRECISION));
+ precision = DecimalType.MAX_PRECISION;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is
out of range, "
+ + "which exceeds the maximum precision
of {}, "
+ + "it will be converted to
decimal({},{})",
+ column.getName(),
+ seatunnelDecimalType.getPrecision(),
+ seatunnelDecimalType.getScale(),
+ DecimalType.MAX_PRECISION,
+ precision,
+ scale);
+ }
+ if (scale < 0) {
+ scale = DecimalType.DEFAULT_SCALE;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is
out of range, "
+ + "which is scale less than 0, "
+ + "it will be converted to
decimal({},{})",
+ column.getName(),
+ seatunnelDecimalType.getPrecision(),
+ seatunnelDecimalType.getScale(),
+ precision,
+ scale);
+ } else if (scale > DecimalType.MAX_PRECISION) {
+ scale = DecimalType.MAX_PRECISION;
+ log.warn(
+ "The decimal column {} type decimal({},{}) is
out of range, "
+ + "which exceeds the maximum scale of
{}, "
+ + "it will be converted to
decimal({},{})",
+ column.getName(),
+ seatunnelDecimalType.getPrecision(),
+ seatunnelDecimalType.getScale(),
+ DecimalType.MAX_PRECISION,
+ precision,
+ scale);
+ }
+
+ DecimalType paimonDecimalType =
DataTypes.DECIMAL(precision, scale);
+ builder.nativeType(paimonDecimalType);
+ builder.columnType(paimonDecimalType.toString());
+ builder.dataType(paimonDecimalType.getTypeRoot().name());
+ builder.scale(scale);
+ builder.precision((long) precision);
+ builder.length(column.getColumnLength());
+ return builder.build();
default:
- return visit(dataType);
+ builder.nativeType(visit(column.getName(), dataType));
+ builder.columnType(dataType.toString());
+ builder.length(column.getColumnLength());
+ builder.dataType(dataType.getSqlType().name());
+ return builder.build();
}
}
- public DataType visit(SeaTunnelDataType<?> dataType) {
+ public DataType visit(String fieldName, SeaTunnelDataType<?> dataType)
{
switch (dataType.getSqlType()) {
case TINYINT:
return DataTypes.TINYINT();
@@ -288,22 +394,29 @@ public class RowTypeConverter {
SeaTunnelDataType<?> valueType =
((org.apache.seatunnel.api.table.type.MapType<?,
?>) dataType)
.getValueType();
- return DataTypes.MAP(visit(keyType), visit(valueType));
+ return DataTypes.MAP(visit(fieldName, keyType),
visit(fieldName, valueType));
case ARRAY:
SeaTunnelDataType<?> elementType =
((org.apache.seatunnel.api.table.type.ArrayType<?,
?>) dataType)
.getElementType();
- return DataTypes.ARRAY(visit(elementType));
+ return DataTypes.ARRAY(visit(fieldName, elementType));
case ROW:
- SeaTunnelDataType<?>[] fieldTypes =
- ((SeaTunnelRowType) dataType).getFieldTypes();
- DataType[] dataTypes =
-
Arrays.stream(fieldTypes).map(this::visit).toArray(DataType[]::new);
+ SeaTunnelRowType row = (SeaTunnelRowType) dataType;
+ SeaTunnelDataType<?>[] fieldTypes = row.getFieldTypes();
+ String[] fieldNames = row.getFieldNames();
+ int totalFields = row.getTotalFields();
+ DataType[] dataTypes = new DataType[totalFields];
+ for (int i = 0; i < totalFields; i++) {
+ dataTypes[i] =
+ SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(
+ fieldNames[i], fieldTypes[i]);
+ }
return DataTypes.ROW(dataTypes);
default:
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unsupported data type: " + dataType.getSqlType());
+ throw CommonError.unsupportedDataType(
+ PaimonConfig.CONNECTOR_IDENTITY,
+ dataType.getSqlType().toString(),
+ fieldName);
}
}
}
@@ -417,12 +530,7 @@ public class RowTypeConverter {
case DOUBLE:
return
org.apache.seatunnel.api.table.type.ArrayType.DOUBLE_ARRAY_TYPE;
default:
- String errorMsg =
- String.format(
- "Array type not support this genericType
[%s]",
- seaTunnelArrayType);
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
+ return null;
}
}
@@ -445,9 +553,8 @@ public class RowTypeConverter {
@Override
protected SeaTunnelDataType defaultMethod(DataType dataType) {
- throw new PaimonConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unsupported data type: " + dataType);
+ throw CommonError.unsupportedDataType(
+ PaimonConfig.CONNECTOR_IDENTITY,
dataType.getTypeRoot().name(), UNKNOWN_FIELD);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index 65129dc8b7..0da047244f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.paimon.utils;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
@@ -34,7 +35,8 @@ import java.util.Objects;
public class SchemaUtil {
public static DataType toPaimonType(Column column) {
- return PaimonTypeMapper.INSTANCE.reconvert(column);
+ BasicTypeDefine<DataType> basicTypeDefine =
PaimonTypeMapper.INSTANCE.reconvert(column);
+ return basicTypeDefine.getNativeType();
}
public static Schema toPaimonSchema(
@@ -62,8 +64,8 @@ public class SchemaUtil {
return paiSchemaBuilder.build();
}
- public static Column toSeaTunnelType(DataType dataType) {
- return PaimonTypeMapper.INSTANCE.convert(dataType);
+ public static Column toSeaTunnelType(BasicTypeDefine<DataType> typeDefine)
{
+ return PaimonTypeMapper.INSTANCE.convert(typeDefine);
}
public static DataField getDataField(List<DataField> fields, String
fieldName) {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index 5e614aeda5..e075efde15 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -46,6 +49,10 @@ public class RowTypeConverterTest {
private RowType rowType;
+ private BasicTypeDefine<DataType> typeDefine;
+
+ private Column column;
+
private TableSchema tableSchema;
public static final RowType DEFAULT_ROW_TYPE =
@@ -148,10 +155,39 @@ public class RowTypeConverterTest {
KEY_NAME_LIST,
Collections.EMPTY_MAP,
"");
+
+ typeDefine =
+ BasicTypeDefine.<DataType>builder()
+ .name("c_decimal")
+ .comment("c_decimal_type_define")
+ .columnType("DECIMAL(30, 8)")
+ .nativeType(DataTypes.DECIMAL(30, 8))
+ .dataType(DataTypes.DECIMAL(30, 8).toString())
+ .length(30L)
+ .precision(30L)
+ .scale(8)
+ .defaultValue(3.0)
+ .nullable(false)
+ .build();
+
+ org.apache.seatunnel.api.table.type.DecimalType dataType =
+ new org.apache.seatunnel.api.table.type.DecimalType(30, 8);
+
+ column =
+ PhysicalColumn.builder()
+ .name("c_decimal")
+ .sourceType(DataTypes.DECIMAL(30, 8).toString())
+ .nullable(false)
+ .dataType(dataType)
+ .columnLength(30L)
+ .defaultValue(3.0)
+ .scale(8)
+ .comment("c_decimal_type_define")
+ .build();
}
@Test
- public void paimonToSeaTunnel() {
+ public void paimonRowTypeToSeaTunnel() {
SeaTunnelRowType convert = RowTypeConverter.convert(rowType);
Assertions.assertEquals(convert, seaTunnelRowType);
}
@@ -161,4 +197,27 @@ public class RowTypeConverterTest {
RowType convert = RowTypeConverter.reconvert(seaTunnelRowType,
tableSchema);
Assertions.assertEquals(convert, rowType);
}
+
+ @Test
+ public void paimonDataTypeToSeaTunnelColumn() {
+ Column column = RowTypeConverter.convert(typeDefine);
+ isEquals(column, typeDefine);
+ }
+
+ @Test
+ public void seaTunnelColumnToPaimonDataType() {
+ BasicTypeDefine<DataType> dataTypeDefine =
RowTypeConverter.reconvert(column);
+ isEquals(column, dataTypeDefine);
+ }
+
+ private void isEquals(Column column, BasicTypeDefine<DataType>
dataTypeDefine) {
+ Assertions.assertEquals(column.getComment(),
dataTypeDefine.getComment());
+ Assertions.assertEquals(column.getColumnLength(),
dataTypeDefine.getLength());
+ Assertions.assertEquals(column.getName(), dataTypeDefine.getName());
+ Assertions.assertEquals(column.isNullable(),
dataTypeDefine.isNullable());
+ Assertions.assertEquals(column.getDefaultValue(),
dataTypeDefine.getDefaultValue());
+ Assertions.assertEquals(column.getScale(), dataTypeDefine.getScale());
+ Assertions.assertTrue(
+
column.getDataType().toString().equalsIgnoreCase(dataTypeDefine.getColumnType()));
+ }
}