This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37762c93f0 [Bug][Connector-V2] fix NPE when decimal type precision is 
incompatible for Paimon (#9452)
37762c93f0 is described below

commit 37762c93f06ed108a33d9ea1fb252eda9c68225c
Author: zhangdonghao <[email protected]>
AuthorDate: Tue Jun 24 14:37:33 2025 +0800

    [Bug][Connector-V2] fix NPE when decimal type precision is incompatible for 
Paimon (#9452)
---
 .../paimon/exception/PaimonConnectorErrorCode.java |  5 +-
 .../seatunnel/paimon/utils/RowConverter.java       | 66 ++++++++++++++--------
 .../seatunnel/paimon/utils/RowConverterTest.java   | 31 ++++++++++
 3 files changed, 76 insertions(+), 26 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index 237c7c2448..868732d4b7 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -29,7 +29,10 @@ public enum PaimonConnectorErrorCode implements 
SeaTunnelErrorCode {
     LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
     GET_FILED_FAILED("PAIMON-07", "Get field failed"),
     UNSUPPORTED_PRIMARY_DATATYPE("PAIMON-08", "Paimon primary key datatype is 
unsupported"),
-    WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in 
dynamic bucket mode");
+    WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in 
dynamic bucket mode"),
+    NON_PRIMARY_KEY_CHECK_ERROR(
+            "PAIMON-10", "Primary keys should be empty when nonPrimaryKey is 
true"),
+    DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is 
incompatible. ");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 8415f752e3..56305aafe1 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.paimon.data.BinaryArray;
@@ -51,6 +53,7 @@ import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.DateTimeUtils;
 
 import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -362,7 +365,9 @@ public class RowConverter {
         int sourceTotalFields = seaTunnelRowType.getTotalFields();
         if (sourceTotalFields != sinkTotalFields.size()) {
             throw CommonError.writeRowErrorWithFieldsCountNotMatch(
-                    "Paimon", sourceTotalFields, sinkTotalFields.size());
+                    PaimonBaseOptions.CONNECTOR_IDENTITY,
+                    sourceTotalFields,
+                    sinkTotalFields.size());
         }
         BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
         BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
@@ -378,31 +383,32 @@ public class RowConverter {
         binaryRow.setRowKind(rowKind);
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
         for (int i = 0; i < fieldTypes.length; i++) {
+            Object fieldValue = seaTunnelRow.getField(i);
             // judge the field is or not equals null
-            if (seaTunnelRow.getField(i) == null) {
+            if (fieldValue == null) {
                 binaryWriter.setNullAt(i);
                 continue;
             }
-            checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields);
+            checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields, 
fieldValue);
             String fieldName = seaTunnelRowType.getFieldName(i);
             switch (fieldTypes[i].getSqlType()) {
                 case TINYINT:
-                    binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
+                    binaryWriter.writeByte(i, (Byte) fieldValue);
                     break;
                 case SMALLINT:
-                    binaryWriter.writeShort(i, (Short) 
seaTunnelRow.getField(i));
+                    binaryWriter.writeShort(i, (Short) fieldValue);
                     break;
                 case INT:
-                    binaryWriter.writeInt(i, (Integer) 
seaTunnelRow.getField(i));
+                    binaryWriter.writeInt(i, (Integer) fieldValue);
                     break;
                 case BIGINT:
-                    binaryWriter.writeLong(i, (Long) seaTunnelRow.getField(i));
+                    binaryWriter.writeLong(i, (Long) fieldValue);
                     break;
                 case FLOAT:
-                    binaryWriter.writeFloat(i, (Float) 
seaTunnelRow.getField(i));
+                    binaryWriter.writeFloat(i, (Float) fieldValue);
                     break;
                 case DOUBLE:
-                    binaryWriter.writeDouble(i, (Double) 
seaTunnelRow.getField(i));
+                    binaryWriter.writeDouble(i, (Double) fieldValue);
                     break;
                 case DECIMAL:
                     DataField decimalDataField =
@@ -418,29 +424,28 @@ public class RowConverter {
                             decimalType.getPrecision());
                     break;
                 case STRING:
-                    binaryWriter.writeString(
-                            i, BinaryString.fromString((String) 
seaTunnelRow.getField(i)));
+                    binaryWriter.writeString(i, 
BinaryString.fromString((String) fieldValue));
                     break;
                 case BYTES:
-                    binaryWriter.writeBinary(i, (byte[]) 
seaTunnelRow.getField(i));
+                    binaryWriter.writeBinary(i, (byte[]) fieldValue);
                     break;
                 case BOOLEAN:
-                    binaryWriter.writeBoolean(i, (Boolean) 
seaTunnelRow.getField(i));
+                    binaryWriter.writeBoolean(i, (Boolean) fieldValue);
                     break;
                 case DATE:
-                    LocalDate date = (LocalDate) seaTunnelRow.getField(i);
+                    LocalDate date = (LocalDate) fieldValue;
                     BinaryWriter.createValueSetter(DataTypes.DATE())
                             .setValue(binaryWriter, i, 
DateTimeUtils.toInternal(date));
                     break;
                 case TIMESTAMP:
                     DataField dataField = 
SchemaUtil.getDataField(sinkTotalFields, fieldName);
                     int precision = ((TimestampType) 
dataField.type()).getPrecision();
-                    LocalDateTime datetime = (LocalDateTime) 
seaTunnelRow.getField(i);
+                    LocalDateTime datetime = (LocalDateTime) fieldValue;
                     binaryWriter.writeTimestamp(
                             i, Timestamp.fromLocalDateTime(datetime), 
precision);
                     break;
                 case TIME:
-                    LocalTime time = (LocalTime) seaTunnelRow.getField(i);
+                    LocalTime time = (LocalTime) fieldValue;
                     BinaryWriter.createValueSetter(DataTypes.TIME())
                             .setValue(binaryWriter, i, 
DateTimeUtils.toInternal(time));
                     break;
@@ -450,7 +455,7 @@ public class RowConverter {
                     SeaTunnelDataType<?> valueType = mapType.getValueType();
                     DataType paimonKeyType = 
RowTypeConverter.reconvert(fieldName, keyType);
                     DataType paimonValueType = 
RowTypeConverter.reconvert(fieldName, valueType);
-                    Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
+                    Map<?, ?> field = (Map<?, ?>) fieldValue;
                     Object[] keys = field.keySet().toArray(new Object[0]);
                     Object[] values = field.values().toArray(new Object[0]);
                     binaryWriter.writeMap(
@@ -463,10 +468,7 @@ public class RowConverter {
                 case ARRAY:
                     ArrayType<?, ?> arrayType = (ArrayType<?, ?>) 
seaTunnelRowType.getFieldType(i);
                     BinaryArray paimonArray =
-                            reconvert(
-                                    fieldName,
-                                    seaTunnelRow.getField(i),
-                                    arrayType.getElementType());
+                            reconvert(fieldName, fieldValue, 
arrayType.getElementType());
                     binaryWriter.writeArray(
                             i,
                             paimonArray,
@@ -476,7 +478,7 @@ public class RowConverter {
                     break;
                 case ROW:
                     SeaTunnelDataType<?> rowType = 
seaTunnelRowType.getFieldType(i);
-                    Object row = seaTunnelRow.getField(i);
+                    Object row = fieldValue;
                     InternalRow paimonRow =
                             reconvert(
                                     (SeaTunnelRow) row,
@@ -497,7 +499,7 @@ public class RowConverter {
     }
 
     private static void checkCanWriteWithSchema(
-            int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
+            int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields, 
Object fieldValue) {
         String sourceFieldName = seaTunnelRowType.getFieldName(i);
         SeaTunnelDataType<?> sourceFieldType = 
seaTunnelRowType.getFieldType(i);
         DataField sinkDataField = fields.get(i);
@@ -508,7 +510,7 @@ public class RowConverter {
         if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())
                 || !StringUtils.equals(sourceFieldName, sinkDataField.name())) 
{
             throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
-                    "Paimon",
+                    PaimonBaseOptions.CONNECTOR_IDENTITY,
                     sourceFieldName + StringUtils.SPACE + 
sourceFieldType.getSqlType(),
                     exceptDataField.asSQLString(),
                     sinkDataField.asSQLString());
@@ -521,11 +523,25 @@ public class RowConverter {
             if (sinkDecimalType.getPrecision() < 
sourceDecimalType.getPrecision()
                     || sinkDecimalType.getScale() < 
sourceDecimalType.getScale()) {
                 throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
-                        "Paimon",
+                        PaimonBaseOptions.CONNECTOR_IDENTITY,
                         sourceFieldName + StringUtils.SPACE + 
sourceFieldType.getSqlType(),
                         exceptDataField.asSQLString(),
                         sinkDataField.asSQLString());
             }
+            BigDecimal bd =
+                    ((BigDecimal) fieldValue)
+                            .setScale(sinkDecimalType.getScale(), 
RoundingMode.HALF_UP);
+            if (bd.precision() > sinkDecimalType.getPrecision()) {
+                String message =
+                        String.format(
+                                "`%s` field value is: %s, except field schema 
of sink is %s, but the field in sink table with actual schema is %s. Please 
check the schema of the sink table.",
+                                sourceFieldName,
+                                fieldValue,
+                                exceptDataField.asSQLString(),
+                                sinkDataField.asSQLString());
+                throw new PaimonConnectorException(
+                        
PaimonConnectorErrorCode.DECIMAL_PRECISION_INCOMPATIBLE, message);
+            }
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index c50df02435..b857fddcbd 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.paimon.data.BinaryArray;
@@ -335,4 +336,34 @@ public class RowConverterTest {
                 RowConverter.convert(internalRow, seaTunnelRowType, 
getTableSchema(10, 10));
         Assertions.assertEquals(convert, seaTunnelRow);
     }
+
+    @Test
+    public void decimalToPaimon() {
+        SeaTunnelRowType sourceType =
+                new SeaTunnelRowType(
+                        new String[] {"f0"}, new SeaTunnelDataType[] {new 
DecimalType(4, 1)});
+        TableSchema sinkSchema =
+                new TableSchema(
+                        0,
+                        TableSchema.newFields(RowType.of(DataTypes.DECIMAL(4, 
2))),
+                        1,
+                        Collections.EMPTY_LIST,
+                        KEY_NAME_LIST,
+                        Collections.EMPTY_MAP,
+                        "");
+        SeaTunnelRow data = new SeaTunnelRow(new Object[] {new 
BigDecimal("123.4")});
+
+        Assertions.assertThrowsExactly(
+                PaimonConnectorException.class,
+                () -> {
+                    try {
+                        RowConverter.reconvert(data, sourceType, sinkSchema);
+                    } catch (Exception e) {
+                        Assertions.assertEquals(
+                                "ErrorCode:[PAIMON-11], 
ErrorDescription:[decimal type precision is incompatible. ] - `f0` field value 
is: 123.4, except field schema of sink is `f0` DECIMAL(4, 1), but the field in 
sink table with actual schema is `f0` DECIMAL(4, 2). Please check the schema of 
the sink table.",
+                                e.getMessage());
+                        throw e;
+                    }
+                });
+    }
 }

Reply via email to