This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3ebe8f2983 NIFI-11538: This closes #7239. Fix primitive type
conversion for PutIceberg
3ebe8f2983 is described below
commit 3ebe8f2983f1214a2062da6d698d85d8587c8a50
Author: Matthew Burgess <[email protected]>
AuthorDate: Thu May 11 12:10:47 2023 -0400
NIFI-11538: This closes #7239. Fix primitive type conversion for PutIceberg
---
.../iceberg/converter/GenericDataConverters.java | 94 ++++++++++++++-----
.../iceberg/converter/IcebergRecordConverter.java | 15 +--
.../iceberg/TestIcebergRecordConverter.java | 101 +++++++++++++++++++++
.../src/test/resources/user.avsc | 2 +-
4 files changed, 183 insertions(+), 29 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index 162d2a38b1..8854bf6e0c 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -19,15 +19,16 @@ package org.apache.nifi.processors.iceberg.converter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.Validate;
import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -47,45 +48,91 @@ import static
org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.cre
*/
public class GenericDataConverters {
- static class SameTypeConverter extends DataConverter<Object, Object> {
+ static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+ final Type.PrimitiveType targetType;
+ final DataType sourceType;
+
+ public PrimitiveTypeConverter(final Type.PrimitiveType type, final
DataType dataType) {
+ targetType = type;
+ sourceType = dataType;
+ }
@Override
public Object convert(Object data) {
- return data;
+ switch (targetType.typeId()) {
+ case BOOLEAN:
+ return DataTypeUtils.toBoolean(data, null);
+ case INTEGER:
+ return DataTypeUtils.toInteger(data, null);
+ case LONG:
+ return DataTypeUtils.toLong(data, null);
+ case FLOAT:
+ return DataTypeUtils.toFloat(data, null);
+ case DOUBLE:
+ return DataTypeUtils.toDouble(data, null);
+ case DATE:
+ return DataTypeUtils.toLocalDate(data, () ->
DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(),
ZoneId.systemDefault()), null);
+ case UUID:
+ return DataTypeUtils.toUUID(data);
+ case STRING:
+ default:
+ return DataTypeUtils.toString(data, () -> null);
+ }
}
}
- static class TimeConverter extends DataConverter<Time, LocalTime> {
+ static class TimeConverter extends DataConverter<Object, LocalTime> {
+
+ private final String timeFormat;
+
+ public TimeConverter(final String format) {
+ this.timeFormat = format;
+ }
@Override
- public LocalTime convert(Time data) {
- return data.toLocalTime();
+ public LocalTime convert(Object data) {
+ return DataTypeUtils.toTime(data, () ->
DataTypeUtils.getDateFormat(timeFormat), null).toLocalTime();
}
}
- static class TimestampConverter extends DataConverter<Timestamp,
LocalDateTime> {
+ static class TimestampConverter extends DataConverter<Object,
LocalDateTime> {
+
+ private final DataType dataType;
+
+ public TimestampConverter(final DataType dataType) {
+ this.dataType = dataType;
+ }
@Override
- public LocalDateTime convert(Timestamp data) {
- return data.toLocalDateTime();
+ public LocalDateTime convert(Object data) {
+ final Timestamp convertedTimestamp =
DataTypeUtils.toTimestamp(data, () ->
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
+ return convertedTimestamp.toLocalDateTime();
}
}
- static class TimestampWithTimezoneConverter extends
DataConverter<Timestamp, OffsetDateTime> {
+ static class TimestampWithTimezoneConverter extends DataConverter<Object,
OffsetDateTime> {
+
+ private final DataType dataType;
+
+ public TimestampWithTimezoneConverter(final DataType dataType) {
+ this.dataType = dataType;
+ }
@Override
- public OffsetDateTime convert(Timestamp data) {
- return OffsetDateTime.ofInstant(data.toInstant(),
ZoneId.of("UTC"));
+ public OffsetDateTime convert(Object data) {
+ final Timestamp convertedTimestamp =
DataTypeUtils.toTimestamp(data, () ->
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
+ return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(),
ZoneId.of("UTC"));
}
}
- static class UUIDtoByteArrayConverter extends DataConverter<UUID, byte[]> {
+ static class UUIDtoByteArrayConverter extends DataConverter<Object,
byte[]> {
@Override
- public byte[] convert(UUID data) {
+ public byte[] convert(Object data) {
+ final UUID uuid = DataTypeUtils.toUUID(data);
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
- byteBuffer.putLong(data.getMostSignificantBits());
- byteBuffer.putLong(data.getLeastSignificantBits());
+ byteBuffer.putLong(uuid.getMostSignificantBits());
+ byteBuffer.putLong(uuid.getLeastSignificantBits());
return byteBuffer.array();
}
}
@@ -113,7 +160,7 @@ public class GenericDataConverters {
}
}
- static class BigDecimalConverter extends DataConverter<BigDecimal,
BigDecimal> {
+ static class BigDecimalConverter extends DataConverter<Object, BigDecimal>
{
private final int precision;
private final int scale;
@@ -123,10 +170,15 @@ public class GenericDataConverters {
}
@Override
- public BigDecimal convert(BigDecimal data) {
- Validate.isTrue(data.scale() == scale, "Cannot write value as
decimal(%s,%s), wrong scale: %s", precision, scale, data);
- Validate.isTrue(data.precision() <= precision, "Cannot write value
as decimal(%s,%s), invalid precision: %s", precision, scale, data);
- return data;
+ public BigDecimal convert(Object data) {
+ if (data instanceof BigDecimal) {
+ BigDecimal bigDecimal = (BigDecimal) data;
+ Validate.isTrue(bigDecimal.scale() == scale, "Cannot write
value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale,
bigDecimal.scale(), data);
+ Validate.isTrue(bigDecimal.precision() <= precision, "Cannot
write value as decimal(%s,%s), invalid precision %s for value: %s",
+ precision, scale, bigDecimal.precision(), data);
+ return bigDecimal;
+ }
+ return DataTypeUtils.toBigDecimal(data, null);
}
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
index 6a74eca4fd..33049123ef 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
@@ -46,7 +46,6 @@ import java.util.stream.Collectors;
public class IcebergRecordConverter {
private final DataConverter<Record, GenericRecord> converter;
-
public GenericRecord convert(Record record) {
return converter.convert(record);
}
@@ -85,21 +84,21 @@ public class IcebergRecordConverter {
case DOUBLE:
case DATE:
case STRING:
- return new GenericDataConverters.SameTypeConverter();
+ return new
GenericDataConverters.PrimitiveTypeConverter(type, dataType);
case TIME:
- return new GenericDataConverters.TimeConverter();
+ return new
GenericDataConverters.TimeConverter(dataType.getFormat());
case TIMESTAMP:
final Types.TimestampType timestampType =
(Types.TimestampType) type;
if (timestampType.shouldAdjustToUTC()) {
- return new
GenericDataConverters.TimestampWithTimezoneConverter();
+ return new
GenericDataConverters.TimestampWithTimezoneConverter(dataType);
}
- return new GenericDataConverters.TimestampConverter();
+ return new
GenericDataConverters.TimestampConverter(dataType);
case UUID:
final UUIDDataType uuidType = (UUIDDataType) dataType;
if (uuidType.getFileFormat() == FileFormat.PARQUET) {
return new
GenericDataConverters.UUIDtoByteArrayConverter();
}
- return new GenericDataConverters.SameTypeConverter();
+ return new
GenericDataConverters.PrimitiveTypeConverter(type, dataType);
case FIXED:
final Types.FixedType fixedType = (Types.FixedType)
type;
return new
GenericDataConverters.FixedConverter(fixedType.length());
@@ -167,7 +166,9 @@ public class IcebergRecordConverter {
return new RecordTypeWithFieldNameMapper(new
Schema(schema.findField(fieldId).type().asStructType().fields()),
(RecordDataType) field.getDataType());
}
- if
(field.getDataType().getFieldType().equals(RecordFieldType.UUID)) {
+ // If the source field or target field is of type UUID, create a
UUIDDataType from it
+ if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)
+ || schema.findField(fieldId).type().typeId() ==
Type.TypeID.UUID) {
return new UUIDDataType(field.getDataType(), fileFormat);
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index f69946e2b8..18690ffe78 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -67,6 +67,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
@@ -143,6 +144,23 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
+ private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema(
+ Types.NestedField.optional(0, "string", Types.StringType.get()),
+ Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "float", Types.FloatType.get()),
+ Types.NestedField.optional(3, "long", Types.LongType.get()),
+ Types.NestedField.optional(4, "double", Types.DoubleType.get()),
+ Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10,
2)),
+ Types.NestedField.optional(7, "fixed",
Types.FixedType.ofLength(5)),
+ Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
+ Types.NestedField.optional(9, "date", Types.DateType.get()),
+ Types.NestedField.optional(10, "time", Types.TimeType.get()),
+ Types.NestedField.optional(11, "timestamp",
Types.TimestampType.withZone()),
+ Types.NestedField.optional(12, "timestampTz",
Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
+ Types.NestedField.optional(14, "choice", Types.IntegerType.get())
+ );
+
private static final Schema CASE_INSENSITIVE_SCHEMA = new Schema(
Types.NestedField.optional(0, "FIELD1", Types.StringType.get()),
Types.NestedField.optional(1, "Field2", Types.StringType.get()),
@@ -221,6 +239,26 @@ public class TestIcebergRecordConverter {
return new SimpleRecordSchema(fields);
}
+ private static RecordSchema getPrimitivesAsCompatiblesSchema() {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("string",
RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("integer",
RecordFieldType.SHORT.getDataType()));
+ fields.add(new RecordField("float",
RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("long", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("double",
RecordFieldType.FLOAT.getDataType()));
+ fields.add(new RecordField("decimal",
RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("fixed",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+ fields.add(new RecordField("binary",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+ fields.add(new RecordField("date",
RecordFieldType.STRING.getDataType("yyyy-MM-dd")));
+ fields.add(new RecordField("time",
RecordFieldType.STRING.getDataType("hh:mm:ss.SSS")));
+ fields.add(new RecordField("timestamp",
RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ")));
+ fields.add(new RecordField("timestampTz",
RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ")));
+ fields.add(new RecordField("uuid",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("choice",
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(),
RecordFieldType.INT.getDataType())));
+
+ return new SimpleRecordSchema(fields);
+ }
+
private static RecordSchema getCaseInsensitiveSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("field1",
RecordFieldType.STRING.getDataType()));
@@ -331,6 +369,27 @@ public class TestIcebergRecordConverter {
return new MapRecord(getPrimitivesSchema(), values);
}
+ private static Record setupCompatiblePrimitivesTestRecord() {
+
+ Map<String, Object> values = new HashMap<>();
+ values.put("string", 123);
+ values.put("integer", 8);
+ values.put("float", 1.23456);
+ values.put("long", 42L);
+ values.put("double", 3.14159);
+ values.put("decimal", 12345678.12);
+ values.put("fixed", "hello".getBytes());
+ values.put("binary", "hello".getBytes());
+ values.put("date", "2017-04-04");
+ values.put("time", "14:20:33.000");
+ values.put("timestamp", "2017-04-04 14:20:33.789-0500");
+ values.put("timestampTz", "2017-04-04 14:20:33.789-0500");
+ values.put("uuid", "0000-00-00-00-000000");
+ values.put("choice", "10");
+
+ return new MapRecord(getPrimitivesAsCompatiblesSchema(), values);
+ }
+
private static Record setupCaseInsensitiveTestRecord() {
Map<String, Object> values = new HashMap<>();
values.put("field1", "Text1");
@@ -414,6 +473,48 @@ public class TestIcebergRecordConverter {
}
}
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+ public void testCompatiblePrimitives(FileFormat format) throws IOException
{
+ RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema();
+ Record record = setupCompatiblePrimitivesTestRecord();
+
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFrom(format,
COMPATIBLE_PRIMITIVES_SCHEMA, tempFile.toInputFile());
+
+ assertEquals(results.size(), 1);
+ GenericRecord resultRecord = results.get(0);
+
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33,
789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime,
ZoneOffset.ofHours(-5));
+ LocalDateTime expectedLocalDateTimestamp =
offsetDateTime.atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
+
+ assertEquals("123", resultRecord.get(0, String.class));
+ assertEquals(Integer.valueOf(8), resultRecord.get(1, Integer.class));
+ assertEquals(Float.valueOf(1.23456F), resultRecord.get(2,
Float.class));
+ assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
+ assertEquals(Double.valueOf(3.141590118408203), resultRecord.get(4,
Double.class));
+ assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5,
BigDecimal.class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(6, byte[].class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(7, ByteBuffer.class).array());
+ assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(8,
LocalDate.class));
+ assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(9,
LocalTime.class));
+ assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(10, OffsetDateTime.class));
+ assertEquals(expectedLocalDateTimestamp, resultRecord.get(11,
LocalDateTime.class));
+ assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class));
+
+ if (format.equals(PARQUET)) {
+ assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0}, resultRecord.get(12, byte[].class));
+ } else {
+ assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(12, UUID.class));
+ }
+ }
+
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
index 5cdac5fe8c..c537a9e496 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
@@ -19,7 +19,7 @@
"type": "record",
"name": "User",
"fields": [
- {"name": "id", "type": ["int", "null"]},
+ {"name": "id", "type": ["long", "null"]},
{"name": "name", "type": ["string", "null"]},
{"name": "department", "type": ["string", "null"]}
]