This is an automated email from the ASF dual-hosted git repository.

mbathori pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 7819280f24 NIFI-13593 PutIceberg issue with decimal scale
7819280f24 is described below

commit 7819280f246ec3d8864b436e0f081f98eaa3f892
Author: Krisztina Zsihovszki <[email protected]>
AuthorDate: Mon Jul 29 18:13:11 2024 +0200

    NIFI-13593 PutIceberg issue with decimal scale
    
    This closes #9121
    
    Signed-off-by: Mark Bathori <[email protected]>
---
 .../iceberg/converter/GenericDataConverters.java   | 17 +++--
 .../iceberg/TestIcebergRecordConverter.java        | 86 +++++++++++-----------
 2 files changed, 55 insertions(+), 48 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index 61bce1a7e0..c2e0c539bf 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -187,14 +187,17 @@ public class GenericDataConverters {
             if (data == null) {
                 return null;
             }
-            if (data instanceof BigDecimal) {
-                BigDecimal bigDecimal = (BigDecimal) data;
-                Validate.isTrue(bigDecimal.scale() == scale, "Cannot write 
value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, 
bigDecimal.scale(), data);
-                Validate.isTrue(bigDecimal.precision() <= precision, "Cannot 
write value as decimal(%s,%s), invalid precision %s for value: %s",
-                        precision, scale, bigDecimal.precision(), data);
-                return bigDecimal;
+
+            BigDecimal bigDecimal = DataTypeUtils.toBigDecimal(data, null);
+
+            if (bigDecimal.scale() < scale) {
+                bigDecimal = bigDecimal.setScale(scale);
             }
-            return DataTypeUtils.toBigDecimal(data, null);
+
+            Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value 
as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, 
bigDecimal.scale(), data);
+            Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write 
value as decimal(%s,%s), invalid precision %s for value: %s",
+                    precision, scale, bigDecimal.precision(), data);
+            return bigDecimal;
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index fdc9b0fa4e..5dc9a202e7 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -158,16 +158,17 @@ public class TestIcebergRecordConverter {
             Types.NestedField.optional(3, "long", Types.LongType.get()),
             Types.NestedField.optional(4, "double", Types.DoubleType.get()),
             Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 
2)),
-            Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
-            Types.NestedField.optional(7, "fixed", 
Types.FixedType.ofLength(5)),
-            Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
-            Types.NestedField.optional(9, "date", Types.DateType.get()),
-            Types.NestedField.optional(10, "time", Types.TimeType.get()),
-            Types.NestedField.optional(11, "timestamp", 
Types.TimestampType.withZone()),
-            Types.NestedField.optional(12, "timestampTz", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
-            Types.NestedField.optional(14, "choice", Types.IntegerType.get()),
-            Types.NestedField.optional(15, "enum", Types.StringType.get())
+            Types.NestedField.optional(6, "decimalLowerScore", 
Types.DecimalType.of(10, 2)),
+            Types.NestedField.optional(7, "boolean", Types.BooleanType.get()),
+            Types.NestedField.optional(8, "fixed", 
Types.FixedType.ofLength(5)),
+            Types.NestedField.optional(9, "binary", Types.BinaryType.get()),
+            Types.NestedField.optional(10, "date", Types.DateType.get()),
+            Types.NestedField.optional(11, "time", Types.TimeType.get()),
+            Types.NestedField.optional(12, "timestamp", 
Types.TimestampType.withZone()),
+            Types.NestedField.optional(13, "timestampTz", 
Types.TimestampType.withoutZone()),
+            Types.NestedField.optional(14, "uuid", Types.UUIDType.get()),
+            Types.NestedField.optional(15, "choice", Types.IntegerType.get()),
+            Types.NestedField.optional(16, "enum", Types.StringType.get())
     );
 
     private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new 
Schema(
@@ -286,6 +287,7 @@ public class TestIcebergRecordConverter {
         fields.add(new RecordField("long", 
RecordFieldType.LONG.getDataType()));
         fields.add(new RecordField("double", 
RecordFieldType.DOUBLE.getDataType()));
         fields.add(new RecordField("decimal", 
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
+        fields.add(new RecordField("decimalLowerScore", 
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
         fields.add(new RecordField("boolean", 
RecordFieldType.BOOLEAN.getDataType()));
         fields.add(new RecordField("fixed", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
         fields.add(new RecordField("binary", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
@@ -463,6 +465,7 @@ public class TestIcebergRecordConverter {
         values.put("long", 42L);
         values.put("double", 3.14159D);
         values.put("decimal", new BigDecimal("12345678.12"));
+        values.put("decimalLowerScore", 12345678.1);
         values.put("boolean", true);
         values.put("fixed", "hello".getBytes());
         values.put("binary", "hello".getBytes());
@@ -588,20 +591,21 @@ public class TestIcebergRecordConverter {
         assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
         assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
         assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
-        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
-        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
-        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
-        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
-        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(12, LocalDateTime.class));
-        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
-        assertEquals("blue", resultRecord.get(15, String.class));
+        assertEquals(new BigDecimal("12345678.10"), resultRecord.get(6, 
BigDecimal.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(9, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(12, OffsetDateTime.class));
+        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(13, LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
+        assertEquals("blue", resultRecord.get(16, String.class));
 
         if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(13, byte[].class));
+            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(14, byte[].class));
         } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(14, UUID.class));
         }
     }
 
@@ -632,14 +636,14 @@ public class TestIcebergRecordConverter {
         assertNull(resultRecord.get(3, Long.class));
         assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
         assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
-        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
-        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
-        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
-        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
-        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(12, LocalDateTime.class));
-        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(9, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(12, OffsetDateTime.class));
+        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(13, LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
 
         if (format.equals(FileFormat.PARQUET)) {
             // Parquet uses a conversion to the byte values of numeric 
characters such as "0" -> byte value 0
@@ -647,9 +651,9 @@ public class TestIcebergRecordConverter {
             ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
             byteBuffer.putLong(uuid.getMostSignificantBits());
             byteBuffer.putLong(uuid.getLeastSignificantBits());
-            assertArrayEquals(byteBuffer.array(), resultRecord.get(13, 
byte[].class));
+            assertArrayEquals(byteBuffer.array(), resultRecord.get(14, 
byte[].class));
         } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(14, UUID.class));
         }
 
         // Test null values
@@ -718,14 +722,14 @@ public class TestIcebergRecordConverter {
         assertNull(resultRecord.get(3, Long.class));
         assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
         assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
-        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
-        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
-        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
-        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
-        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
-        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(12, LocalDateTime.class));
-        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(9, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(12, OffsetDateTime.class));
+        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(13, LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
 
         if (format.equals(FileFormat.PARQUET)) {
             // Parquet uses a conversion to the byte values of numeric 
characters such as "0" -> byte value 0
@@ -733,9 +737,9 @@ public class TestIcebergRecordConverter {
             ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
             byteBuffer.putLong(uuid.getMostSignificantBits());
             byteBuffer.putLong(uuid.getLeastSignificantBits());
-            assertArrayEquals(byteBuffer.array(), resultRecord.get(13, 
byte[].class));
+            assertArrayEquals(byteBuffer.array(), resultRecord.get(14, 
byte[].class));
         } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(14, UUID.class));
         }
     }
 

Reply via email to