This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new e98a2ef5fd NIFI-11177: Add defensive code for null values for Iceberg
e98a2ef5fd is described below
commit e98a2ef5fd7b0c4229545130f6c5aebb670e8d27
Author: Matt Burgess <[email protected]>
AuthorDate: Thu Sep 21 23:56:58 2023 -0400
NIFI-11177: Add defensive code for null values for Iceberg
Signed-off-by: Pierre Villard <[email protected]>
This closes #7777.
---
.../serialization/record/util/DataTypeUtils.java | 2 +-
.../iceberg/converter/GenericDataConverters.java | 29 +++++++++++++++++++---
.../iceberg/TestIcebergRecordConverter.java | 28 +++++++++++++++++++++
3 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 8752d85b19..7464b28ee9 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -238,7 +238,7 @@ public class DataTypeUtils {
public static UUID toUUID(Object value) {
if (value == null) {
- throw new IllegalTypeConversionException("Null values cannot be
converted to a UUID");
+ return null;
}
if (value instanceof UUID) {
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index 8854bf6e0c..c8ee7bd171 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -29,6 +29,7 @@ import
org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -91,7 +92,8 @@ public class GenericDataConverters {
@Override
public LocalTime convert(Object data) {
- return DataTypeUtils.toTime(data, () ->
DataTypeUtils.getDateFormat(timeFormat), null).toLocalTime();
+ Time time = DataTypeUtils.toTime(data, () ->
DataTypeUtils.getDateFormat(timeFormat), null);
+ return time == null ? null : time.toLocalTime();
}
}
@@ -106,7 +108,7 @@ public class GenericDataConverters {
@Override
public LocalDateTime convert(Object data) {
final Timestamp convertedTimestamp =
DataTypeUtils.toTimestamp(data, () ->
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
- return convertedTimestamp.toLocalDateTime();
+ return convertedTimestamp == null ? null :
convertedTimestamp.toLocalDateTime();
}
}
@@ -121,7 +123,7 @@ public class GenericDataConverters {
@Override
public OffsetDateTime convert(Object data) {
final Timestamp convertedTimestamp =
DataTypeUtils.toTimestamp(data, () ->
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
- return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(),
ZoneId.of("UTC"));
+ return convertedTimestamp == null ? null :
OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), ZoneId.of("UTC"));
}
}
@@ -129,6 +131,9 @@ public class GenericDataConverters {
@Override
public byte[] convert(Object data) {
+ if (data == null) {
+ return null;
+ }
final UUID uuid = DataTypeUtils.toUUID(data);
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
@@ -147,6 +152,9 @@ public class GenericDataConverters {
@Override
public byte[] convert(Byte[] data) {
+ if (data == null) {
+ return null;
+ }
Validate.isTrue(data.length == length, String.format("Cannot write
byte array of length %s as fixed[%s]", data.length, length));
return ArrayUtils.toPrimitive(data);
}
@@ -156,6 +164,9 @@ public class GenericDataConverters {
@Override
public ByteBuffer convert(Byte[] data) {
+ if (data == null) {
+ return null;
+ }
return ByteBuffer.wrap(ArrayUtils.toPrimitive(data));
}
}
@@ -171,6 +182,9 @@ public class GenericDataConverters {
@Override
public BigDecimal convert(Object data) {
+ if (data == null) {
+ return null;
+ }
if (data instanceof BigDecimal) {
BigDecimal bigDecimal = (BigDecimal) data;
Validate.isTrue(bigDecimal.scale() == scale, "Cannot write
value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale,
bigDecimal.scale(), data);
@@ -194,6 +208,9 @@ public class GenericDataConverters {
@Override
@SuppressWarnings("unchecked")
public List<T> convert(S[] data) {
+ if (data == null) {
+ return null;
+ }
final int numElements = data.length;
final List<T> result = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i += 1) {
@@ -219,6 +236,9 @@ public class GenericDataConverters {
@Override
@SuppressWarnings("unchecked")
public Map<TK, TV> convert(Map<SK, SV> data) {
+ if (data == null) {
+ return null;
+ }
final int mapSize = data.size();
final Object[] keyArray = data.keySet().toArray();
final Object[] valueArray = data.values().toArray();
@@ -253,6 +273,9 @@ public class GenericDataConverters {
@Override
public GenericRecord convert(Record data) {
+ if (data == null) {
+ return null;
+ }
final GenericRecord record = GenericRecord.create(schema);
for (DataConverter<?, ?> converter : converters) {
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index 18690ffe78..baf220fea2 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -83,6 +83,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
@@ -471,6 +472,33 @@ public class TestIcebergRecordConverter {
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(13, UUID.class));
}
+
+ // Test null values
+ for (String fieldName : record.getRawFieldNames()) {
+ record.setValue(fieldName, null);
+ }
+
+ genericRecord = recordConverter.convert(record);
+
+ writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+ results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
+
+ assertEquals(results.size(), 1);
+ resultRecord = results.get(0);
+ assertNull(resultRecord.get(0, String.class));
+ assertNull(resultRecord.get(1, Integer.class));
+ assertNull(resultRecord.get(2, Float.class));
+ assertNull(resultRecord.get(3, Long.class));
+ assertNull(resultRecord.get(4, Double.class));
+ assertNull(resultRecord.get(5, BigDecimal.class));
+ assertNull(resultRecord.get(6, Boolean.class));
+ assertNull(resultRecord.get(7));
+ assertNull(resultRecord.get(8));
+ assertNull(resultRecord.get(9, LocalDate.class));
+ assertNull(resultRecord.get(10, LocalTime.class));
+ assertNull(resultRecord.get(11, OffsetDateTime.class));
+ assertNull(resultRecord.get(14, Integer.class));
}
@DisabledOnOs(WINDOWS)