This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c29a744644 NIFI-12847: Add Enum data type handling to Iceberg record
converter
c29a744644 is described below
commit c29a744644134bb122dbbddc3eb3d6ba3d98508a
Author: Mark Bathori <[email protected]>
AuthorDate: Tue Feb 27 13:47:19 2024 +0100
NIFI-12847: Add Enum data type handling to Iceberg record converter
Signed-off-by: Pierre Villard <[email protected]>
This closes #8453.
---
.../serialization/record/util/DataTypeUtils.java | 2 +-
.../iceberg/converter/ArrayElementGetter.java | 4 +++
.../iceberg/converter/RecordFieldGetter.java | 4 +++
.../iceberg/TestIcebergRecordConverter.java | 6 ++++-
.../iceberg/TestPutIcebergWithHiveCatalog.java | 2 +-
.../src/test/resources/user.avsc | 29 ++++++++++++++++------
6 files changed, 36 insertions(+), 11 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 000435e410..d45b5053c3 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1103,7 +1103,7 @@ public class DataTypeUtils {
return enumType.getEnums() != null &&
enumType.getEnums().contains(value);
}
- private static Object toEnum(Object value, EnumDataType dataType, String
fieldName) {
+ public static Object toEnum(Object value, EnumDataType dataType, String
fieldName) {
if(dataType.getEnums() != null && dataType.getEnums().contains(value))
{
return value.toString();
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
index f03c8f64a0..7e0be786e7 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
@@ -22,6 +22,7 @@ import
org.apache.nifi.serialization.record.field.FieldConverter;
import
org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.EnumDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@@ -96,6 +97,9 @@ public class ArrayElementGetter {
return converter.convertField(element,
Optional.ofNullable(dataType.getFormat()), ARRAY_FIELD_NAME);
};
break;
+ case ENUM:
+ elementGetter = element -> DataTypeUtils.toEnum(element,
(EnumDataType) dataType, ARRAY_FIELD_NAME);
+ break;
case UUID:
elementGetter = DataTypeUtils::toUUID;
break;
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
index 2b7c7c09da..d0f9d55d87 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
@@ -23,6 +23,7 @@ import
org.apache.nifi.serialization.record.field.FieldConverter;
import
org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.EnumDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@@ -101,6 +102,9 @@ public class RecordFieldGetter {
case UUID:
fieldGetter = record ->
DataTypeUtils.toUUID(record.getValue(fieldName));
break;
+ case ENUM:
+ fieldGetter = record ->
DataTypeUtils.toEnum(record.getValue(fieldName), (EnumDataType) dataType,
fieldName);
+ break;
case ARRAY:
fieldGetter = record ->
DataTypeUtils.toArray(record.getValue(fieldName), fieldName, ((ArrayDataType)
dataType).getElementType());
break;
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index b067a9202d..48915ba7f2 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -166,7 +166,8 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(11, "timestamp",
Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz",
Types.TimestampType.withoutZone()),
Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
- Types.NestedField.optional(14, "choice", Types.IntegerType.get())
+ Types.NestedField.optional(14, "choice", Types.IntegerType.get()),
+ Types.NestedField.optional(15, "enum", Types.StringType.get())
);
private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new
Schema(
@@ -294,6 +295,7 @@ public class TestIcebergRecordConverter {
fields.add(new RecordField("timestampTz",
RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("uuid",
RecordFieldType.UUID.getDataType()));
fields.add(new RecordField("choice",
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(),
RecordFieldType.INT.getDataType())));
+ fields.add(new RecordField("enum",
RecordFieldType.ENUM.getEnumDataType(Arrays.asList("red", "blue", "yellow"))));
return new SimpleRecordSchema(fields);
}
@@ -469,6 +471,7 @@ public class TestIcebergRecordConverter {
values.put("timestampTz", Timestamp.valueOf(LOCAL_DATE_TIME));
values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
values.put("choice", "10");
+ values.put("enum", "blue");
return new MapRecord(getPrimitivesSchema(), values);
}
@@ -590,6 +593,7 @@ public class TestIcebergRecordConverter {
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(11, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(12,
LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+ assertEquals("blue", resultRecord.get(15, String.class));
if (format.equals(PARQUET)) {
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0}, resultRecord.get(13, byte[].class));
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index c61925050c..d2b942cf58 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -109,7 +109,7 @@ public class TestPutIcebergWithHiveCatalog {
RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema);
for (RecordField recordField : recordSchema.getFields()) {
- readerFactory.addSchemaField(recordField.getFieldName(),
recordField.getDataType().getFieldType(), recordField.isNullable());
+ readerFactory.addSchemaField(recordField);
}
readerFactory.addRecord(0, "John", "Finance");
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
index c537a9e496..799c0023b4 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
@@ -15,12 +15,25 @@
* limitations under the License.
*/
{
- "namespace": "nifi",
- "type": "record",
- "name": "User",
- "fields": [
- {"name": "id", "type": ["long", "null"]},
- {"name": "name", "type": ["string", "null"]},
- {"name": "department", "type": ["string", "null"]}
- ]
+ "namespace": "nifi",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {
+ "name": "id",
+ "type": ["long", "null"]
+ },
+ {
+ "name": "name",
+ "type": ["string", "null"]
+ },
+ {
+ "name": "department",
+ "type": {
+ "name": "Department",
+ "type": "enum",
+ "symbols": ["Finance", "Marketing", "Sales"]
+ }
+ }
+ ]
}