This is an automated email from the ASF dual-hosted git repository.
mbathori pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 7819280f24 NIFI-13593 PutIceberg issue with decimal scale
7819280f24 is described below
commit 7819280f246ec3d8864b436e0f081f98eaa3f892
Author: Krisztina Zsihovszki <[email protected]>
AuthorDate: Mon Jul 29 18:13:11 2024 +0200
NIFI-13593 PutIceberg issue with decimal scale
This closes #9121
Signed-off-by: Mark Bathori <[email protected]>
---
.../iceberg/converter/GenericDataConverters.java | 17 +++--
.../iceberg/TestIcebergRecordConverter.java | 86 +++++++++++-----------
2 files changed, 55 insertions(+), 48 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index 61bce1a7e0..c2e0c539bf 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -187,14 +187,17 @@ public class GenericDataConverters {
if (data == null) {
return null;
}
- if (data instanceof BigDecimal) {
- BigDecimal bigDecimal = (BigDecimal) data;
- Validate.isTrue(bigDecimal.scale() == scale, "Cannot write
value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale,
bigDecimal.scale(), data);
- Validate.isTrue(bigDecimal.precision() <= precision, "Cannot
write value as decimal(%s,%s), invalid precision %s for value: %s",
- precision, scale, bigDecimal.precision(), data);
- return bigDecimal;
+
+ BigDecimal bigDecimal = DataTypeUtils.toBigDecimal(data, null);
+
+ if (bigDecimal.scale() < scale) {
+ bigDecimal = bigDecimal.setScale(scale);
}
- return DataTypeUtils.toBigDecimal(data, null);
+
+ Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value
as decimal(%s,%s), wrong scale %s for value: %s", precision, scale,
bigDecimal.scale(), data);
+ Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write
value as decimal(%s,%s), invalid precision %s for value: %s",
+ precision, scale, bigDecimal.precision(), data);
+ return bigDecimal;
}
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index fdc9b0fa4e..5dc9a202e7 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -158,16 +158,17 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(3, "long", Types.LongType.get()),
Types.NestedField.optional(4, "double", Types.DoubleType.get()),
Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10,
2)),
- Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
- Types.NestedField.optional(7, "fixed",
Types.FixedType.ofLength(5)),
- Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
- Types.NestedField.optional(9, "date", Types.DateType.get()),
- Types.NestedField.optional(10, "time", Types.TimeType.get()),
- Types.NestedField.optional(11, "timestamp",
Types.TimestampType.withZone()),
- Types.NestedField.optional(12, "timestampTz",
Types.TimestampType.withoutZone()),
- Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
- Types.NestedField.optional(14, "choice", Types.IntegerType.get()),
- Types.NestedField.optional(15, "enum", Types.StringType.get())
+ Types.NestedField.optional(6, "decimalLowerScore",
Types.DecimalType.of(10, 2)),
+ Types.NestedField.optional(7, "boolean", Types.BooleanType.get()),
+ Types.NestedField.optional(8, "fixed",
Types.FixedType.ofLength(5)),
+ Types.NestedField.optional(9, "binary", Types.BinaryType.get()),
+ Types.NestedField.optional(10, "date", Types.DateType.get()),
+ Types.NestedField.optional(11, "time", Types.TimeType.get()),
+ Types.NestedField.optional(12, "timestamp",
Types.TimestampType.withZone()),
+ Types.NestedField.optional(13, "timestampTz",
Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(14, "uuid", Types.UUIDType.get()),
+ Types.NestedField.optional(15, "choice", Types.IntegerType.get()),
+ Types.NestedField.optional(16, "enum", Types.StringType.get())
);
private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new
Schema(
@@ -286,6 +287,7 @@ public class TestIcebergRecordConverter {
fields.add(new RecordField("long",
RecordFieldType.LONG.getDataType()));
fields.add(new RecordField("double",
RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("decimal",
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
+ fields.add(new RecordField("decimalLowerScore",
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
fields.add(new RecordField("boolean",
RecordFieldType.BOOLEAN.getDataType()));
fields.add(new RecordField("fixed",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
fields.add(new RecordField("binary",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
@@ -463,6 +465,7 @@ public class TestIcebergRecordConverter {
values.put("long", 42L);
values.put("double", 3.14159D);
values.put("decimal", new BigDecimal("12345678.12"));
+ values.put("decimalLowerScore", 12345678.1);
values.put("boolean", true);
values.put("fixed", "hello".getBytes());
values.put("binary", "hello".getBytes());
@@ -588,20 +591,21 @@ public class TestIcebergRecordConverter {
assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4,
Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5,
BigDecimal.class));
- assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
- assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(7, byte[].class));
- assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, ByteBuffer.class).array());
- assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9,
LocalDate.class));
- assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10,
LocalTime.class));
- assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(11, OffsetDateTime.class));
- assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(12, LocalDateTime.class));
- assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
- assertEquals("blue", resultRecord.get(15, String.class));
+ assertEquals(new BigDecimal("12345678.10"), resultRecord.get(6,
BigDecimal.class));
+ assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, byte[].class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(9, ByteBuffer.class).array());
+ assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10,
LocalDate.class));
+ assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11,
LocalTime.class));
+ assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(12, OffsetDateTime.class));
+ assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(13, LocalDateTime.class));
+ assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
+ assertEquals("blue", resultRecord.get(16, String.class));
if (format.equals(PARQUET)) {
- assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0}, resultRecord.get(13, byte[].class));
+ assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0}, resultRecord.get(14, byte[].class));
} else {
- assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(13, UUID.class));
+ assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(14, UUID.class));
}
}
@@ -632,14 +636,14 @@ public class TestIcebergRecordConverter {
assertNull(resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4,
Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5,
BigDecimal.class));
- assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
- assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(7, byte[].class));
- assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, ByteBuffer.class).array());
- assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9,
LocalDate.class));
- assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10,
LocalTime.class));
- assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(11, OffsetDateTime.class));
- assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(12, LocalDateTime.class));
- assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+ assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, byte[].class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(9, ByteBuffer.class).array());
+ assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10,
LocalDate.class));
+ assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11,
LocalTime.class));
+ assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(12, OffsetDateTime.class));
+ assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(13, LocalDateTime.class));
+ assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
if (format.equals(FileFormat.PARQUET)) {
// Parquet uses a conversion to the byte values of numeric
characters such as "0" -> byte value 0
@@ -647,9 +651,9 @@ public class TestIcebergRecordConverter {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
- assertArrayEquals(byteBuffer.array(), resultRecord.get(13,
byte[].class));
+ assertArrayEquals(byteBuffer.array(), resultRecord.get(14,
byte[].class));
} else {
- assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(13, UUID.class));
+ assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(14, UUID.class));
}
// Test null values
@@ -718,14 +722,14 @@ public class TestIcebergRecordConverter {
assertNull(resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4,
Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5,
BigDecimal.class));
- assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
- assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(7, byte[].class));
- assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, ByteBuffer.class).array());
- assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9,
LocalDate.class));
- assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10,
LocalTime.class));
- assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(11, OffsetDateTime.class));
- assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(12, LocalDateTime.class));
- assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+ assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, byte[].class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(9, ByteBuffer.class).array());
+ assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10,
LocalDate.class));
+ assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11,
LocalTime.class));
+ assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(12, OffsetDateTime.class));
+ assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(13, LocalDateTime.class));
+ assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
if (format.equals(FileFormat.PARQUET)) {
// Parquet uses a conversion to the byte values of numeric
characters such as "0" -> byte value 0
@@ -733,9 +737,9 @@ public class TestIcebergRecordConverter {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
- assertArrayEquals(byteBuffer.array(), resultRecord.get(13,
byte[].class));
+ assertArrayEquals(byteBuffer.array(), resultRecord.get(14,
byte[].class));
} else {
- assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(13, UUID.class));
+ assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(14, UUID.class));
}
}