This is an automated email from the ASF dual-hosted git repository.

mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b3c6d8d98a NIFI-13593 PutIceberg issue with decimal scale
b3c6d8d98a is described below

commit b3c6d8d98ab80ace6333331846aa8f4f21a54b43
Author: Krisztina Zsihovszki <[email protected]>
AuthorDate: Mon Jul 29 18:13:11 2024 +0200

    NIFI-13593 PutIceberg issue with decimal scale
    
    This closes #9121
    
    Signed-off-by: Mark Bathori <[email protected]>
---
 .../iceberg/converter/GenericDataConverters.java   | 16 ++--
 .../iceberg/TestIcebergRecordConverter.java        | 86 +++++++++++-----------
 2 files changed, 55 insertions(+), 47 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index b7dec0b670..e11041c4fc 100644
--- 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -188,13 +188,17 @@ public class GenericDataConverters {
             if (data == null) {
                 return null;
             }
-            if (data instanceof BigDecimal bigDecimal) {
-                Validate.isTrue(bigDecimal.scale() == scale, "Cannot write 
value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, 
bigDecimal.scale(), data);
-                Validate.isTrue(bigDecimal.precision() <= precision, "Cannot 
write value as decimal(%s,%s), invalid precision %s for value: %s",
-                        precision, scale, bigDecimal.precision(), data);
-                return bigDecimal;
+
+            BigDecimal bigDecimal = DataTypeUtils.toBigDecimal(data, null);
+
+            if (bigDecimal.scale() < scale) {
+                bigDecimal = bigDecimal.setScale(scale);
             }
-            return DataTypeUtils.toBigDecimal(data, null);
+
+            Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value 
as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, 
bigDecimal.scale(), data);
+            Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write 
value as decimal(%s,%s), invalid precision %s for value: %s",
+                    precision, scale, bigDecimal.precision(), data);
+            return bigDecimal;
         }
     }
 
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index 6314a46f02..4423541b24 100644
--- 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -156,16 +156,17 @@ public class TestIcebergRecordConverter {
             Types.NestedField.optional(3, "long", Types.LongType.get()),
             Types.NestedField.optional(4, "double", Types.DoubleType.get()),
             Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 
2)),
-            Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
-            Types.NestedField.optional(7, "fixed", 
Types.FixedType.ofLength(5)),
-            Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
-            Types.NestedField.optional(9, "date", Types.DateType.get()),
-            Types.NestedField.optional(10, "time", Types.TimeType.get()),
-            Types.NestedField.optional(11, "timestamp", 
Types.TimestampType.withZone()),
-            Types.NestedField.optional(12, "timestampTz", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
-            Types.NestedField.optional(14, "choice", Types.IntegerType.get()),
-            Types.NestedField.optional(15, "enum", Types.StringType.get())
+            Types.NestedField.optional(6, "decimalLowerScore", 
Types.DecimalType.of(10, 2)),
+            Types.NestedField.optional(7, "boolean", Types.BooleanType.get()),
+            Types.NestedField.optional(8, "fixed", 
Types.FixedType.ofLength(5)),
+            Types.NestedField.optional(9, "binary", Types.BinaryType.get()),
+            Types.NestedField.optional(10, "date", Types.DateType.get()),
+            Types.NestedField.optional(11, "time", Types.TimeType.get()),
+            Types.NestedField.optional(12, "timestamp", 
Types.TimestampType.withZone()),
+            Types.NestedField.optional(13, "timestampTz", 
Types.TimestampType.withoutZone()),
+            Types.NestedField.optional(14, "uuid", Types.UUIDType.get()),
+            Types.NestedField.optional(15, "choice", Types.IntegerType.get()),
+            Types.NestedField.optional(16, "enum", Types.StringType.get())
     );
 
     private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new 
Schema(
@@ -284,6 +285,7 @@ public class TestIcebergRecordConverter {
         fields.add(new RecordField("long", 
RecordFieldType.LONG.getDataType()));
         fields.add(new RecordField("double", 
RecordFieldType.DOUBLE.getDataType()));
         fields.add(new RecordField("decimal", 
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
+        fields.add(new RecordField("decimalLowerScore", 
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
         fields.add(new RecordField("boolean", 
RecordFieldType.BOOLEAN.getDataType()));
         fields.add(new RecordField("fixed", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
         fields.add(new RecordField("binary", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
@@ -460,6 +462,7 @@ public class TestIcebergRecordConverter {
         values.put("long", 42L);
         values.put("double", 3.14159D);
         values.put("decimal", new BigDecimal("12345678.12"));
+        values.put("decimalLowerScore", 12345678.1);
         values.put("boolean", true);
         values.put("fixed", "hello".getBytes());
         values.put("binary", "hello".getBytes());
@@ -583,20 +586,21 @@ public class TestIcebergRecordConverter {
         assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
         assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
         assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
-        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
-        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
-        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
-        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
-        assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, 
LocalDateTime.class));
-        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
-        assertEquals("blue", resultRecord.get(15, String.class));
+        assertEquals(new BigDecimal("12345678.10"), resultRecord.get(6, 
BigDecimal.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(9, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(12, OffsetDateTime.class));
+        assertEquals(LOCAL_DATE_TIME, resultRecord.get(13, 
LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
+        assertEquals("blue", resultRecord.get(16, String.class));
 
         if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(13, byte[].class));
+            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(14, byte[].class));
         } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(14, UUID.class));
         }
     }
 
@@ -626,14 +630,14 @@ public class TestIcebergRecordConverter {
         assertNull(resultRecord.get(3, Long.class));
         assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
         assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
-        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
-        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
-        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
-        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
-        assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, 
LocalDateTime.class));
-        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(9, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(12, OffsetDateTime.class));
+        assertEquals(LOCAL_DATE_TIME, resultRecord.get(13, 
LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
 
         if (format.equals(FileFormat.PARQUET)) {
             // Parquet uses a conversion to the byte values of numeric 
characters such as "0" -> byte value 0
@@ -641,9 +645,9 @@ public class TestIcebergRecordConverter {
             ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
             byteBuffer.putLong(uuid.getMostSignificantBits());
             byteBuffer.putLong(uuid.getLeastSignificantBits());
-            assertArrayEquals(byteBuffer.array(), resultRecord.get(13, 
byte[].class));
+            assertArrayEquals(byteBuffer.array(), resultRecord.get(14, 
byte[].class));
         } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(14, UUID.class));
         }
 
         // Test null values
@@ -711,14 +715,14 @@ public class TestIcebergRecordConverter {
         assertNull(resultRecord.get(3, Long.class));
         assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
         assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
-        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
-        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
-        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
-        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
-        assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, 
LocalDateTime.class));
-        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(9, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(12, OffsetDateTime.class));
+        assertEquals(LOCAL_DATE_TIME, resultRecord.get(13, 
LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
 
         if (format.equals(FileFormat.PARQUET)) {
             // Parquet uses a conversion to the byte values of numeric 
characters such as "0" -> byte value 0
@@ -726,9 +730,9 @@ public class TestIcebergRecordConverter {
             ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
             byteBuffer.putLong(uuid.getMostSignificantBits());
             byteBuffer.putLong(uuid.getLeastSignificantBits());
-            assertArrayEquals(byteBuffer.array(), resultRecord.get(13, 
byte[].class));
+            assertArrayEquals(byteBuffer.array(), resultRecord.get(14, 
byte[].class));
         } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(14, UUID.class));
         }
     }
 

Reply via email to