This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 8f4d202271 NIFI-11739 - Add ability to ignore missing fields in
PutIceberg
8f4d202271 is described below
commit 8f4d202271727dfb22be8cb522e4c2e4edf473a1
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Jun 21 20:39:43 2023 -0400
NIFI-11739 - Add ability to ignore missing fields in PutIceberg
Signed-off-by: Pierre Villard <[email protected]>
This closes #7421.
---
.../iceberg/UnmatchedColumnBehavior.java | 56 +++++
.../iceberg/converter/GenericDataConverters.java | 65 +++++-
.../iceberg/converter/IcebergRecordConverter.java | 50 ++++-
.../apache/nifi/processors/iceberg/PutIceberg.java | 14 +-
.../processors/iceberg/TestDataFileActions.java | 6 +-
.../iceberg/TestIcebergRecordConverter.java | 246 ++++++++++++++++++---
.../iceberg/TestPutIcebergWithHadoopCatalog.java | 29 ++-
.../iceberg/TestPutIcebergWithHiveCatalog.java | 36 ++-
8 files changed, 418 insertions(+), 84 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java
new file mode 100644
index 0000000000..094f0daf60
--- /dev/null
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum UnmatchedColumnBehavior implements DescribedValue {
+ IGNORE_UNMATCHED_COLUMN("Ignore Unmatched Columns",
+ "Any column in the database that does not have a field in the
document will be assumed to not be required. No notification will be logged"),
+
+ WARNING_UNMATCHED_COLUMN("Warn on Unmatched Columns",
+ "Any column in the database that does not have a field in the
document will be assumed to not be required. A warning will be logged"),
+
+ FAIL_UNMATCHED_COLUMN("Fail on Unmatched Columns",
+ "A flow will fail if any column in the database that does not have
a field in the document. An error will be logged");
+
+
+ private final String displayName;
+ private final String description;
+
+ UnmatchedColumnBehavior(final String displayName, final String
description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index c8ee7bd171..794d2c5f37 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -21,9 +21,11 @@ import org.apache.commons.lang3.Validate;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -265,9 +267,16 @@ public class GenericDataConverters {
for (DataConverter<?, ?> converter : converters) {
final Optional<RecordField> recordField =
recordSchema.getField(converter.getSourceFieldName());
- final RecordField field = recordField.get();
- // creates a record field accessor for every data converter
- getters.put(converter.getTargetFieldName(),
createFieldGetter(field.getDataType(), field.getFieldName(),
field.isNullable()));
+ if (recordField.isEmpty()) {
+ final Types.NestedField missingField =
schema.field(converter.getTargetFieldName());
+ if (missingField != null) {
+ getters.put(converter.getTargetFieldName(),
createFieldGetter(convertSchemaTypeToDataType(missingField.type()),
missingField.name(), missingField.isOptional()));
+ }
+ } else {
+ final RecordField field = recordField.get();
+ // creates a record field accessor for every data converter
+ getters.put(converter.getTargetFieldName(),
createFieldGetter(field.getDataType(), field.getFieldName(),
field.isNullable()));
+ }
}
}
@@ -290,4 +299,54 @@ public class GenericDataConverters {
return converter.convert((S)
getters.get(converter.getTargetFieldName()).getFieldOrNull(record));
}
}
+
+ public static DataType convertSchemaTypeToDataType(Type schemaType) {
+ switch (schemaType.typeId()) {
+ case BOOLEAN:
+ return RecordFieldType.BOOLEAN.getDataType();
+ case INTEGER:
+ return RecordFieldType.INT.getDataType();
+ case LONG:
+ return RecordFieldType.LONG.getDataType();
+ case FLOAT:
+ return RecordFieldType.FLOAT.getDataType();
+ case DOUBLE:
+ return RecordFieldType.DOUBLE.getDataType();
+ case DATE:
+ return RecordFieldType.DATE.getDataType();
+ case TIME:
+ return RecordFieldType.TIME.getDataType();
+ case TIMESTAMP:
+ return RecordFieldType.TIMESTAMP.getDataType();
+ case STRING:
+ return RecordFieldType.STRING.getDataType();
+ case UUID:
+ return RecordFieldType.UUID.getDataType();
+ case FIXED:
+ case BINARY:
+ return
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ case DECIMAL:
+ return RecordFieldType.DECIMAL.getDataType();
+ case STRUCT:
+ // Build a record type from the struct type
+ Types.StructType structType = schemaType.asStructType();
+ List<Types.NestedField> fields = structType.fields();
+ List<RecordField> recordFields = new
ArrayList<>(fields.size());
+ for (Types.NestedField field : fields) {
+ DataType dataType =
convertSchemaTypeToDataType(field.type());
+ recordFields.add(new RecordField(field.name(), dataType,
field.isOptional()));
+ }
+ RecordSchema recordSchema = new
SimpleRecordSchema(recordFields);
+ return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+ case LIST:
+ // Build a list type from the elements
+ Types.ListType listType = schemaType.asListType();
+ return
RecordFieldType.ARRAY.getArrayDataType(convertSchemaTypeToDataType(listType.elementType()),
listType.isElementOptional());
+ case MAP:
+ // Build a map type from the elements
+ Types.MapType mapType = schemaType.asMapType();
+ return
RecordFieldType.MAP.getMapDataType(convertSchemaTypeToDataType(mapType.valueType()),
mapType.isValueOptional());
+ }
+ throw new IllegalArgumentException("Invalid or unsupported type: " +
schemaType);
+ }
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
index 33049123ef..49ff37f475 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
@@ -25,6 +25,8 @@ import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.iceberg.UnmatchedColumnBehavior;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -46,19 +48,26 @@ import java.util.stream.Collectors;
public class IcebergRecordConverter {
private final DataConverter<Record, GenericRecord> converter;
+ public final UnmatchedColumnBehavior unmatchedColumnBehavior;
+ public ComponentLog logger;
+
public GenericRecord convert(Record record) {
return converter.convert(record);
}
+
@SuppressWarnings("unchecked")
- public IcebergRecordConverter(Schema schema, RecordSchema recordSchema,
FileFormat fileFormat) {
- this.converter = (DataConverter<Record, GenericRecord>)
IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema),
fileFormat);
+ public IcebergRecordConverter(Schema schema, RecordSchema recordSchema,
FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior,
ComponentLog logger) {
+ this.converter = (DataConverter<Record, GenericRecord>)
IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema),
fileFormat, unmatchedColumnBehavior, logger);
+ this.unmatchedColumnBehavior = unmatchedColumnBehavior;
+ this.logger = logger;
}
private static class IcebergSchemaVisitor extends
SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
- public static DataConverter<?, ?> visit(Schema schema, RecordDataType
recordDataType, FileFormat fileFormat) {
- return visit(schema, new RecordTypeWithFieldNameMapper(schema,
recordDataType), new IcebergSchemaVisitor(), new
IcebergPartnerAccessors(schema, fileFormat));
+ public static DataConverter<?, ?> visit(Schema schema, RecordDataType
recordDataType, FileFormat fileFormat, UnmatchedColumnBehavior
unmatchedColumnBehavior, ComponentLog logger) {
+ return visit(schema, new RecordTypeWithFieldNameMapper(schema,
recordDataType), new IcebergSchemaVisitor(),
+ new IcebergPartnerAccessors(schema, fileFormat,
unmatchedColumnBehavior, logger));
}
@Override
@@ -123,8 +132,10 @@ public class IcebergRecordConverter {
// set NiFi schema field names (sourceFieldName) in the data
converters
for (DataConverter<?, ?> converter : converters) {
final Optional<String> mappedFieldName =
recordType.getNameMapping(converter.getTargetFieldName());
- final Optional<RecordField> recordField =
recordSchema.getField(mappedFieldName.get());
- converter.setSourceFieldName(recordField.get().getFieldName());
+ if (mappedFieldName.isPresent()) {
+ final Optional<RecordField> recordField =
recordSchema.getField(mappedFieldName.get());
+
converter.setSourceFieldName(recordField.get().getFieldName());
+ }
}
return new GenericDataConverters.RecordConverter(converters,
recordSchema, type);
@@ -144,10 +155,14 @@ public class IcebergRecordConverter {
public static class IcebergPartnerAccessors implements
SchemaWithPartnerVisitor.PartnerAccessors<DataType> {
private final Schema schema;
private final FileFormat fileFormat;
+ private final UnmatchedColumnBehavior unmatchedColumnBehavior;
+ private final ComponentLog logger;
- IcebergPartnerAccessors(Schema schema, FileFormat fileFormat) {
+ IcebergPartnerAccessors(Schema schema, FileFormat fileFormat,
UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) {
this.schema = schema;
this.fileFormat = fileFormat;
+ this.unmatchedColumnBehavior = unmatchedColumnBehavior;
+ this.logger = logger;
}
@Override
@@ -156,8 +171,25 @@ public class IcebergRecordConverter {
final RecordTypeWithFieldNameMapper recordType =
(RecordTypeWithFieldNameMapper) dataType;
final Optional<String> mappedFieldName =
recordType.getNameMapping(name);
- Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot
find field with name '%s' in the record schema", name));
-
+ if
(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior))
{
+ Validate.isTrue(mappedFieldName.isPresent(),
String.format("Cannot find field with name '%s' in the record schema", name));
+ }
+ if (mappedFieldName.isEmpty()) {
+ if
(UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior))
{
+ if (logger != null) {
+ logger.warn("Cannot find field with name '" + name +
"' in the record schema, using the target schema for datatype and a null
value");
+ }
+ }
+ // If the field is missing, use the expected type from the
schema (converted to a DataType)
+ final Types.NestedField schemaField =
schema.findField(fieldId);
+ final Type schemaFieldType = schemaField.type();
+ if (schemaField.isRequired()) {
+ // Iceberg requires a non-null value for required fields
+ throw new IllegalArgumentException("Iceberg requires a
non-null value for required fields, field: "
+ + schemaField.name() + ", type: " +
schemaFieldType);
+ }
+ return
GenericDataConverters.convertSchemaTypeToDataType(schemaFieldType);
+ }
final Optional<RecordField> recordField =
recordType.getChildSchema().getField(mappedFieldName.get());
final RecordField field = recordField.get();
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index 02bd0b074f..22b1ec5507 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -108,6 +108,14 @@ public class PutIceberg extends AbstractIcebergProcessor {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
+ static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new
PropertyDescriptor.Builder()
+ .name("unmatched-column-behavior")
+ .displayName("Unmatched Column Behavior")
+ .description("If an incoming record does not have a field mapping
for all of the database table's columns, this property specifies how to handle
the situation.")
+ .allowableValues(UnmatchedColumnBehavior.class)
+
.defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue())
+ .required(true)
+ .build();
static final PropertyDescriptor FILE_FORMAT = new
PropertyDescriptor.Builder()
.name("file-format")
.displayName("File Format")
@@ -178,6 +186,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
CATALOG,
CATALOG_NAMESPACE,
TABLE_NAME,
+ UNMATCHED_COLUMN_BEHAVIOR,
FILE_FORMAT,
MAXIMUM_FILE_SIZE,
KERBEROS_USER_SERVICE,
@@ -256,8 +265,10 @@ public class PutIceberg extends AbstractIcebergProcessor {
final FileFormat format = getFileFormat(table.properties(),
fileFormat);
final IcebergTaskWriterFactory taskWriterFactory = new
IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize);
taskWriter = taskWriterFactory.create();
+ final UnmatchedColumnBehavior unmatchedColumnBehavior =
+
UnmatchedColumnBehavior.valueOf(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
- final IcebergRecordConverter recordConverter = new
IcebergRecordConverter(table.schema(), reader.getSchema(), format);
+ final IcebergRecordConverter recordConverter = new
IcebergRecordConverter(table.schema(), reader.getSchema(), format,
unmatchedColumnBehavior, getLogger());
Record record;
while ((record = reader.nextRecord()) != null) {
@@ -353,5 +364,4 @@ public class PutIceberg extends AbstractIcebergProcessor {
.retry(3)
.run(file -> table.io().deleteFile(file.path().toString()));
}
-
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
index 4e535c3f8a..cc45705eb3 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
@@ -32,6 +32,8 @@ import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
@@ -79,10 +81,12 @@ public class TestDataFileActions {
);
private PutIceberg icebergProcessor;
+ private ComponentLog logger;
@BeforeEach
public void setUp() {
icebergProcessor = new PutIceberg();
+ logger = new MockComponentLogger();
}
@DisabledOnOs(WINDOWS)
@@ -103,7 +107,7 @@ public class TestDataFileActions {
IcebergTaskWriterFactory taskWriterFactory = new
IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET,
null);
TaskWriter<Record> taskWriter = taskWriterFactory.create();
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
for (MapRecord record : recordList) {
taskWriter.write(recordConverter.convert(record));
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index baf220fea2..c064c723f0 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -37,6 +37,8 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter;
@@ -57,6 +59,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
@@ -92,9 +95,12 @@ public class TestIcebergRecordConverter {
private OutputFile tempFile;
+ private ComponentLog logger;
+
@BeforeEach
public void setUp() throws Exception {
tempFile = Files.localOutput(createTempFile("test", null));
+ logger = new MockComponentLogger();
}
@AfterEach
@@ -145,6 +151,24 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
+ private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new
Schema(
+ Types.NestedField.optional(0, "string", Types.StringType.get()),
+ Types.NestedField.required(1, "integer", Types.IntegerType.get()),
+ Types.NestedField.required(2, "float", Types.FloatType.get()),
+ Types.NestedField.required(3, "long", Types.LongType.get()),
+ Types.NestedField.optional(4, "double", Types.DoubleType.get()),
+ Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10,
2)),
+ Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
+ Types.NestedField.optional(7, "fixed",
Types.FixedType.ofLength(5)),
+ Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
+ Types.NestedField.optional(9, "date", Types.DateType.get()),
+ Types.NestedField.optional(10, "time", Types.TimeType.get()),
+ Types.NestedField.optional(11, "timestamp",
Types.TimestampType.withZone()),
+ Types.NestedField.optional(12, "timestampTz",
Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
+ Types.NestedField.optional(14, "choice", Types.IntegerType.get())
+ );
+
private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema(
Types.NestedField.optional(0, "string", Types.StringType.get()),
Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
@@ -240,6 +264,24 @@ public class TestIcebergRecordConverter {
return new SimpleRecordSchema(fields);
}
+ private static RecordSchema getPrimitivesSchemaMissingFields() {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("string",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("double",
RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("decimal",
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
+ fields.add(new RecordField("boolean",
RecordFieldType.BOOLEAN.getDataType()));
+ fields.add(new RecordField("fixed",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+ fields.add(new RecordField("binary",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+ fields.add(new RecordField("date",
RecordFieldType.DATE.getDataType()));
+ fields.add(new RecordField("time",
RecordFieldType.TIME.getDataType()));
+ fields.add(new RecordField("timestamp",
RecordFieldType.TIMESTAMP.getDataType()));
+ fields.add(new RecordField("timestampTz",
RecordFieldType.TIMESTAMP.getDataType()));
+ fields.add(new RecordField("uuid",
RecordFieldType.UUID.getDataType()));
+ fields.add(new RecordField("choice",
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(),
RecordFieldType.INT.getDataType())));
+
+ return new SimpleRecordSchema(fields);
+ }
+
private static RecordSchema getPrimitivesAsCompatiblesSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string",
RecordFieldType.INT.getDataType()));
@@ -370,6 +412,29 @@ public class TestIcebergRecordConverter {
return new MapRecord(getPrimitivesSchema(), values);
}
+ private static Record setupPrimitivesTestRecordMissingFields() {
+ LocalDate localDate = LocalDate.of(2017, 4, 4);
+ LocalTime localTime = LocalTime.of(14, 20, 33);
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33,
789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime,
ZoneOffset.ofHours(-5));
+
+ Map<String, Object> values = new HashMap<>();
+ values.put("string", "Test String");
+ values.put("double", 3.14159D);
+ values.put("decimal", new BigDecimal("12345678.12"));
+ values.put("boolean", true);
+ values.put("fixed", "hello".getBytes());
+ values.put("binary", "hello".getBytes());
+ values.put("date", localDate);
+ values.put("time", Time.valueOf(localTime));
+ values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
+ values.put("timestampTz", Timestamp.valueOf(localDateTime));
+ values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
+ values.put("choice", "10");
+
+ return new MapRecord(getPrimitivesSchemaMissingFields(), values);
+ }
+
private static Record setupCompatiblePrimitivesTestRecord() {
Map<String, Object> values = new HashMap<>();
@@ -439,7 +504,7 @@ public class TestIcebergRecordConverter {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord();
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
@@ -472,6 +537,54 @@ public class TestIcebergRecordConverter {
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(13, UUID.class));
}
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+ public void testPrimitivesIgnoreMissingFields(FileFormat format) throws
IOException {
+ RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+ Record record = setupPrimitivesTestRecordMissingFields();
+ MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA,
tempFile.toInputFile());
+
+ assertEquals(results.size(), 1);
+ GenericRecord resultRecord = results.get(0);
+
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33,
789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime,
ZoneOffset.ofHours(-5));
+
+ assertEquals("Test String", resultRecord.get(0, String.class));
+ assertNull(resultRecord.get(1, Integer.class));
+ assertNull(resultRecord.get(2, Float.class));
+ assertNull(resultRecord.get(3, Long.class));
+ assertEquals(Double.valueOf(3.14159D), resultRecord.get(4,
Double.class));
+ assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5,
BigDecimal.class));
+ assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(7, byte[].class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, ByteBuffer.class).array());
+ assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9,
LocalDate.class));
+ assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10,
LocalTime.class));
+ assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(11, OffsetDateTime.class));
+ assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(12, LocalDateTime.class));
+ assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+
+ if (format.equals(FileFormat.PARQUET)) {
+ // Parquet uses a conversion to the byte values of numeric
characters such as "0" -> byte value 0
+ UUID uuid = UUID.fromString("0000-00-00-00-000000");
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+ byteBuffer.putLong(uuid.getMostSignificantBits());
+ byteBuffer.putLong(uuid.getLeastSignificantBits());
+ assertArrayEquals(byteBuffer.array(), resultRecord.get(13,
byte[].class));
+ } else {
+ assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(13, UUID.class));
+ }
// Test null values
for (String fieldName : record.getRawFieldNames()) {
@@ -504,11 +617,81 @@ public class TestIcebergRecordConverter {
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
- public void testCompatiblePrimitives(FileFormat format) throws IOException
{
+ public void testPrimitivesMissingRequiredFields(FileFormat format) {
+ RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+ MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new
IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema,
format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+ public void testPrimitivesWarnMissingFields(FileFormat format) throws
IOException {
+ RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+ Record record = setupPrimitivesTestRecordMissingFields();
+ MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN, mockComponentLogger);
+ GenericRecord genericRecord = recordConverter.convert(record);
+
+ writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+ List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA,
tempFile.toInputFile());
+
+ assertEquals(results.size(), 1);
+ GenericRecord resultRecord = results.get(0);
+
+ LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33,
789000000);
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime,
ZoneOffset.ofHours(-5));
+
+ assertEquals("Test String", resultRecord.get(0, String.class));
+ assertNull(resultRecord.get(1, Integer.class));
+ assertNull(resultRecord.get(2, Float.class));
+ assertNull(resultRecord.get(3, Long.class));
+ assertEquals(Double.valueOf(3.14159D), resultRecord.get(4,
Double.class));
+ assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5,
BigDecimal.class));
+ assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(7, byte[].class));
+ assertArrayEquals(new byte[]{104, 101, 108, 108, 111},
resultRecord.get(8, ByteBuffer.class).array());
+ assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9,
LocalDate.class));
+ assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10,
LocalTime.class));
+ assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
resultRecord.get(11, OffsetDateTime.class));
+ assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000),
resultRecord.get(12, LocalDateTime.class));
+ assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+
+ if (format.equals(FileFormat.PARQUET)) {
+ // Parquet uses a conversion to the byte values of numeric
characters such as "0" -> byte value 0
+ UUID uuid = UUID.fromString("0000-00-00-00-000000");
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+ byteBuffer.putLong(uuid.getMostSignificantBits());
+ byteBuffer.putLong(uuid.getLeastSignificantBits());
+ assertArrayEquals(byteBuffer.array(), resultRecord.get(13,
byte[].class));
+ } else {
+ assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(13, UUID.class));
+ }
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @ParameterizedTest
+ @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+ public void testPrimitivesFailMissingFields(FileFormat format) throws
IOException {
+ RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+ MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new IcebergRecordConverter(PRIMITIVES_SCHEMA,
nifiSchema, format, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN,
mockComponentLogger));
+ }
+
+ @DisabledOnOs(WINDOWS)
+ @Test
+ public void testCompatiblePrimitives() throws IOException {
RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema();
Record record = setupCompatiblePrimitivesTestRecord();
+ final FileFormat format = PARQUET;
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile);
@@ -536,21 +719,17 @@ public class TestIcebergRecordConverter {
assertEquals(expectedLocalDateTimestamp, resultRecord.get(11,
LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class));
- if (format.equals(PARQUET)) {
- assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0}, resultRecord.get(12, byte[].class));
- } else {
- assertEquals(UUID.fromString("0000-00-00-00-000000"),
resultRecord.get(12, UUID.class));
- }
+ assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0}, resultRecord.get(12, byte[].class));
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
- public void testStruct(FileFormat format) throws IOException {
+ @Test
+ public void testStruct() throws IOException {
RecordSchema nifiSchema = getStructSchema();
Record record = setupStructTestRecord();
+ final FileFormat format = FileFormat.ORC;
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, STRUCT_SCHEMA, genericRecord, tempFile);
@@ -574,13 +753,13 @@ public class TestIcebergRecordConverter {
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
- public void testList(FileFormat format) throws IOException {
+ @Test
+ public void testList() throws IOException {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
+ final FileFormat format = FileFormat.AVRO;
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, LIST_SCHEMA, genericRecord, tempFile);
@@ -593,7 +772,7 @@ public class TestIcebergRecordConverter {
assertEquals(1, resultRecord.size());
assertInstanceOf(List.class, resultRecord.get(0));
- List nestedList = resultRecord.get(0, List.class);
+ List<?> nestedList = resultRecord.get(0, List.class);
assertEquals(2, nestedList.size());
assertInstanceOf(List.class, nestedList.get(0));
@@ -604,13 +783,13 @@ public class TestIcebergRecordConverter {
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
- public void testMap(FileFormat format) throws IOException {
+ @Test
+ public void testMap() throws IOException {
RecordSchema nifiSchema = getMapSchema();
Record record = setupMapTestRecord();
+ final FileFormat format = PARQUET;
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, MAP_SCHEMA, genericRecord, tempFile);
@@ -636,20 +815,21 @@ public class TestIcebergRecordConverter {
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testSchemaMismatch(FileFormat format) {
- RecordSchema nifiSchema = getPrimitivesSchema();
+ RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
- IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> new
IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format));
- assertTrue(e.getMessage().contains("Cannot find field with name
'FIELD1' in the record schema"), e.getMessage());
+ IllegalArgumentException e =
assertThrows(IllegalArgumentException.class,
+ () -> new
IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema,
format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger));
+ assertTrue(e.getMessage().contains("Iceberg requires a non-null value
for required fields"), e.getMessage());
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
- public void testCaseInsensitiveFieldMapping(FileFormat format) throws
IOException {
+ @Test
+ public void testCaseInsensitiveFieldMapping() throws IOException {
RecordSchema nifiSchema = getCaseInsensitiveSchema();
Record record = setupCaseInsensitiveTestRecord();
+ final FileFormat format = FileFormat.AVRO;
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, CASE_INSENSITIVE_SCHEMA, genericRecord, tempFile);
@@ -667,13 +847,13 @@ public class TestIcebergRecordConverter {
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
- public void testUnorderedFieldMapping(FileFormat format) throws
IOException {
+ @Test
+ public void testUnorderedFieldMapping() throws IOException {
RecordSchema nifiSchema = getUnorderedSchema();
Record record = setupUnorderedTestRecord();
+ final FileFormat format = PARQUET;
- IcebergRecordConverter recordConverter = new
IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format);
+ IcebergRecordConverter recordConverter = new
IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format,
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, UNORDERED_SCHEMA, genericRecord, tempFile);
@@ -698,7 +878,7 @@ public class TestIcebergRecordConverter {
assertEquals("value5", resultRecord.get(2, String.class));
assertInstanceOf(Map.class, resultRecord.get(3));
- Map map = resultRecord.get(3, Map.class);
+ Map<?,?> map = resultRecord.get(3, Map.class);
assertEquals("map value1", map.get("key1"));
assertEquals("map value2", map.get("key2"));
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
index ff8f5a9a3e..49ce684302 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -38,9 +39,8 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -102,14 +102,14 @@ public class TestPutIcebergWithHadoopCatalog {
runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
}
- private void initCatalog(PartitionSpec spec, String fileFormat) throws
InitializationException, IOException {
+ private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws
InitializationException, IOException {
TestHadoopCatalogService catalogService = new
TestHadoopCatalogService();
IcebergCatalogFactory catalogFactory = new
IcebergCatalogFactory(catalogService);
catalog = catalogFactory.create();
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
- tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+ tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT,
fileFormat.name());
catalog.createTable(TABLE_IDENTIFIER, DATE_SCHEMA, spec,
tableProperties);
@@ -120,16 +120,15 @@ public class TestPutIcebergWithHadoopCatalog {
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @ValueSource(strings = {"avro", "orc", "parquet"})
- public void onTriggerYearTransform(String fileFormat) throws Exception {
+ @Test
+ public void onTriggerYearTransform() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
.year("date")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- initCatalog(spec, fileFormat);
+ initCatalog(spec, FileFormat.PARQUET);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);
@@ -148,16 +147,15 @@ public class TestPutIcebergWithHadoopCatalog {
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @ValueSource(strings = {"avro", "orc", "parquet"})
- public void onTriggerMonthTransform(String fileFormat) throws Exception {
+ @Test
+ public void onTriggerMonthTransform() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
.month("timestampMicros")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- initCatalog(spec, fileFormat);
+ initCatalog(spec, FileFormat.ORC);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);
@@ -177,16 +175,15 @@ public class TestPutIcebergWithHadoopCatalog {
}
@DisabledOnOs(WINDOWS)
- @ParameterizedTest
- @ValueSource(strings = {"avro", "orc", "parquet"})
- public void onTriggerDayTransform(String fileFormat) throws Exception {
+ @Test
+ public void onTriggerDayTransform() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
.day("timestampMicros")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- initCatalog(spec, fileFormat);
+ initCatalog(spec, FileFormat.AVRO);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index bc159ef470..05d140a829 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -44,10 +45,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -122,10 +122,10 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
}
- private void initCatalog(PartitionSpec spec, String fileFormat) throws
InitializationException {
+ private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws
InitializationException {
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
- tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+ tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT,
fileFormat.name());
TestHiveCatalogService catalogService = new
TestHiveCatalogService.Builder()
.withMetastoreUri(metastore.getThriftConnectionUri())
@@ -143,16 +143,15 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.CATALOG, "catalog-service");
}
- @ParameterizedTest
- @ValueSource(strings = {"avro"})
- public void onTriggerPartitioned(String fileFormat) throws Exception {
+ @Test
+ public void onTriggerPartitioned() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
.bucket("department", 3)
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- initCatalog(spec, fileFormat);
+ initCatalog(spec, FileFormat.AVRO);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
@@ -181,16 +180,15 @@ public class TestPutIcebergWithHiveCatalog {
assertProvenanceEvents();
}
- @ParameterizedTest
- @ValueSource(strings = {"orc"})
- public void onTriggerIdentityPartitioned(String fileFormat) throws
Exception {
+ @Test
+ public void onTriggerIdentityPartitioned() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
.identity("department")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- initCatalog(spec, fileFormat);
+ initCatalog(spec, FileFormat.ORC);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
@@ -219,9 +217,8 @@ public class TestPutIcebergWithHiveCatalog {
assertProvenanceEvents();
}
- @ParameterizedTest
- @ValueSource(strings = {"parquet"})
- public void onTriggerMultiLevelIdentityPartitioned(String fileFormat)
throws Exception {
+ @Test
+ public void onTriggerMultiLevelIdentityPartitioned() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
.identity("name")
.identity("department")
@@ -229,7 +226,7 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- initCatalog(spec, fileFormat);
+ initCatalog(spec, FileFormat.PARQUET);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
@@ -262,12 +259,11 @@ public class TestPutIcebergWithHiveCatalog {
assertProvenanceEvents();
}
- @ParameterizedTest
- @ValueSource(strings = {"avro"})
- public void onTriggerUnPartitioned(String fileFormat) throws Exception {
+ @Test
+ public void onTriggerUnPartitioned() throws Exception {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- initCatalog(PartitionSpec.unpartitioned(), fileFormat);
+ initCatalog(PartitionSpec.unpartitioned(), FileFormat.AVRO);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}");
runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}");
runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");