This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 37762c93f0 [Bug][Connector-V2] fix NPE when decimal type precision is
incompatible for Paimon (#9452)
37762c93f0 is described below
commit 37762c93f06ed108a33d9ea1fb252eda9c68225c
Author: zhangdonghao <[email protected]>
AuthorDate: Tue Jun 24 14:37:33 2025 +0800
[Bug][Connector-V2] fix NPE when decimal type precision is incompatible for
Paimon (#9452)
---
.../paimon/exception/PaimonConnectorErrorCode.java | 5 +-
.../seatunnel/paimon/utils/RowConverter.java | 66 ++++++++++++++--------
.../seatunnel/paimon/utils/RowConverterTest.java | 31 ++++++++++
3 files changed, 76 insertions(+), 26 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index 237c7c2448..868732d4b7 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -29,7 +29,10 @@ public enum PaimonConnectorErrorCode implements
SeaTunnelErrorCode {
LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
GET_FILED_FAILED("PAIMON-07", "Get field failed"),
UNSUPPORTED_PRIMARY_DATATYPE("PAIMON-08", "Paimon primary key datatype is
unsupported"),
- WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in
dynamic bucket mode");
+ WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in
dynamic bucket mode"),
+ NON_PRIMARY_KEY_CHECK_ERROR(
+ "PAIMON-10", "Primary keys should be empty when nonPrimaryKey is
true"),
+ DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is
incompatible. ");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 8415f752e3..56305aafe1 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.data.BinaryArray;
@@ -51,6 +53,7 @@ import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.DateTimeUtils;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -362,7 +365,9 @@ public class RowConverter {
int sourceTotalFields = seaTunnelRowType.getTotalFields();
if (sourceTotalFields != sinkTotalFields.size()) {
throw CommonError.writeRowErrorWithFieldsCountNotMatch(
- "Paimon", sourceTotalFields, sinkTotalFields.size());
+ PaimonBaseOptions.CONNECTOR_IDENTITY,
+ sourceTotalFields,
+ sinkTotalFields.size());
}
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
@@ -378,31 +383,32 @@ public class RowConverter {
binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
+ Object fieldValue = seaTunnelRow.getField(i);
// judge the field is or not equals null
- if (seaTunnelRow.getField(i) == null) {
+ if (fieldValue == null) {
binaryWriter.setNullAt(i);
continue;
}
- checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields);
+ checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields,
fieldValue);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
- binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
+ binaryWriter.writeByte(i, (Byte) fieldValue);
break;
case SMALLINT:
- binaryWriter.writeShort(i, (Short)
seaTunnelRow.getField(i));
+ binaryWriter.writeShort(i, (Short) fieldValue);
break;
case INT:
- binaryWriter.writeInt(i, (Integer)
seaTunnelRow.getField(i));
+ binaryWriter.writeInt(i, (Integer) fieldValue);
break;
case BIGINT:
- binaryWriter.writeLong(i, (Long) seaTunnelRow.getField(i));
+ binaryWriter.writeLong(i, (Long) fieldValue);
break;
case FLOAT:
- binaryWriter.writeFloat(i, (Float)
seaTunnelRow.getField(i));
+ binaryWriter.writeFloat(i, (Float) fieldValue);
break;
case DOUBLE:
- binaryWriter.writeDouble(i, (Double)
seaTunnelRow.getField(i));
+ binaryWriter.writeDouble(i, (Double) fieldValue);
break;
case DECIMAL:
DataField decimalDataField =
@@ -418,29 +424,28 @@ public class RowConverter {
decimalType.getPrecision());
break;
case STRING:
- binaryWriter.writeString(
- i, BinaryString.fromString((String)
seaTunnelRow.getField(i)));
+ binaryWriter.writeString(i,
BinaryString.fromString((String) fieldValue));
break;
case BYTES:
- binaryWriter.writeBinary(i, (byte[])
seaTunnelRow.getField(i));
+ binaryWriter.writeBinary(i, (byte[]) fieldValue);
break;
case BOOLEAN:
- binaryWriter.writeBoolean(i, (Boolean)
seaTunnelRow.getField(i));
+ binaryWriter.writeBoolean(i, (Boolean) fieldValue);
break;
case DATE:
- LocalDate date = (LocalDate) seaTunnelRow.getField(i);
+ LocalDate date = (LocalDate) fieldValue;
BinaryWriter.createValueSetter(DataTypes.DATE())
.setValue(binaryWriter, i,
DateTimeUtils.toInternal(date));
break;
case TIMESTAMP:
DataField dataField =
SchemaUtil.getDataField(sinkTotalFields, fieldName);
int precision = ((TimestampType)
dataField.type()).getPrecision();
- LocalDateTime datetime = (LocalDateTime)
seaTunnelRow.getField(i);
+ LocalDateTime datetime = (LocalDateTime) fieldValue;
binaryWriter.writeTimestamp(
i, Timestamp.fromLocalDateTime(datetime),
precision);
break;
case TIME:
- LocalTime time = (LocalTime) seaTunnelRow.getField(i);
+ LocalTime time = (LocalTime) fieldValue;
BinaryWriter.createValueSetter(DataTypes.TIME())
.setValue(binaryWriter, i,
DateTimeUtils.toInternal(time));
break;
@@ -450,7 +455,7 @@ public class RowConverter {
SeaTunnelDataType<?> valueType = mapType.getValueType();
DataType paimonKeyType =
RowTypeConverter.reconvert(fieldName, keyType);
DataType paimonValueType =
RowTypeConverter.reconvert(fieldName, valueType);
- Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
+ Map<?, ?> field = (Map<?, ?>) fieldValue;
Object[] keys = field.keySet().toArray(new Object[0]);
Object[] values = field.values().toArray(new Object[0]);
binaryWriter.writeMap(
@@ -463,10 +468,7 @@ public class RowConverter {
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>)
seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
- reconvert(
- fieldName,
- seaTunnelRow.getField(i),
- arrayType.getElementType());
+ reconvert(fieldName, fieldValue,
arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
@@ -476,7 +478,7 @@ public class RowConverter {
break;
case ROW:
SeaTunnelDataType<?> rowType =
seaTunnelRowType.getFieldType(i);
- Object row = seaTunnelRow.getField(i);
+ Object row = fieldValue;
InternalRow paimonRow =
reconvert(
(SeaTunnelRow) row,
@@ -497,7 +499,7 @@ public class RowConverter {
}
private static void checkCanWriteWithSchema(
- int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
+ int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields,
Object fieldValue) {
String sourceFieldName = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType<?> sourceFieldType =
seaTunnelRowType.getFieldType(i);
DataField sinkDataField = fields.get(i);
@@ -508,7 +510,7 @@ public class RowConverter {
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())
|| !StringUtils.equals(sourceFieldName, sinkDataField.name()))
{
throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
- "Paimon",
+ PaimonBaseOptions.CONNECTOR_IDENTITY,
sourceFieldName + StringUtils.SPACE +
sourceFieldType.getSqlType(),
exceptDataField.asSQLString(),
sinkDataField.asSQLString());
@@ -521,11 +523,25 @@ public class RowConverter {
if (sinkDecimalType.getPrecision() <
sourceDecimalType.getPrecision()
|| sinkDecimalType.getScale() <
sourceDecimalType.getScale()) {
throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
- "Paimon",
+ PaimonBaseOptions.CONNECTOR_IDENTITY,
sourceFieldName + StringUtils.SPACE +
sourceFieldType.getSqlType(),
exceptDataField.asSQLString(),
sinkDataField.asSQLString());
}
+ BigDecimal bd =
+ ((BigDecimal) fieldValue)
+ .setScale(sinkDecimalType.getScale(),
RoundingMode.HALF_UP);
+ if (bd.precision() > sinkDecimalType.getPrecision()) {
+ String message =
+ String.format(
+ "`%s` field value is: %s, except field schema
of sink is %s, but the field in sink table with actual schema is %s. Please
check the schema of the sink table.",
+ sourceFieldName,
+ fieldValue,
+ exceptDataField.asSQLString(),
+ sinkDataField.asSQLString());
+ throw new PaimonConnectorException(
+
PaimonConnectorErrorCode.DECIMAL_PRECISION_INCOMPATIBLE, message);
+ }
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index c50df02435..b857fddcbd 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.data.BinaryArray;
@@ -335,4 +336,34 @@ public class RowConverterTest {
RowConverter.convert(internalRow, seaTunnelRowType,
getTableSchema(10, 10));
Assertions.assertEquals(convert, seaTunnelRow);
}
+
+ @Test
+ public void decimalToPaimon() {
+ SeaTunnelRowType sourceType =
+ new SeaTunnelRowType(
+ new String[] {"f0"}, new SeaTunnelDataType[] {new
DecimalType(4, 1)});
+ TableSchema sinkSchema =
+ new TableSchema(
+ 0,
+ TableSchema.newFields(RowType.of(DataTypes.DECIMAL(4,
2))),
+ 1,
+ Collections.EMPTY_LIST,
+ KEY_NAME_LIST,
+ Collections.EMPTY_MAP,
+ "");
+ SeaTunnelRow data = new SeaTunnelRow(new Object[] {new
BigDecimal("123.4")});
+
+ Assertions.assertThrowsExactly(
+ PaimonConnectorException.class,
+ () -> {
+ try {
+ RowConverter.reconvert(data, sourceType, sinkSchema);
+ } catch (Exception e) {
+ Assertions.assertEquals(
+ "ErrorCode:[PAIMON-11],
ErrorDescription:[decimal type precision is incompatible. ] - `f0` field value
is: 123.4, except field schema of sink is `f0` DECIMAL(4, 1), but the field in
sink table with actual schema is `f0` DECIMAL(4, 2). Please check the schema of
the sink table.",
+ e.getMessage());
+ throw e;
+ }
+ });
+ }
}