This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f7b1de6b8 NIFI-11739 - Add ability to ignore missing fields in 
PutIceberg
3f7b1de6b8 is described below

commit 3f7b1de6b8102674f86feff55704450671a2c02c
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Wed Jun 21 20:39:43 2023 -0400

    NIFI-11739 - Add ability to ignore missing fields in PutIceberg
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #7421.
---
 .../iceberg/UnmatchedColumnBehavior.java           |  56 +++++
 .../iceberg/converter/GenericDataConverters.java   |  65 +++++-
 .../iceberg/converter/IcebergRecordConverter.java  |  50 ++++-
 .../apache/nifi/processors/iceberg/PutIceberg.java |  14 +-
 .../processors/iceberg/TestDataFileActions.java    |   6 +-
 .../iceberg/TestIcebergRecordConverter.java        | 246 ++++++++++++++++++---
 .../iceberg/TestPutIcebergWithHadoopCatalog.java   |  29 ++-
 .../iceberg/TestPutIcebergWithHiveCatalog.java     |  36 ++-
 8 files changed, 418 insertions(+), 84 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java
new file mode 100644
index 0000000000..094f0daf60
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum UnmatchedColumnBehavior implements DescribedValue {
+    IGNORE_UNMATCHED_COLUMN("Ignore Unmatched Columns",
+            "Any column in the database that does not have a field in the 
document will be assumed to not be required.  No notification will be logged"),
+
+    WARNING_UNMATCHED_COLUMN("Warn on Unmatched Columns",
+            "Any column in the database that does not have a field in the 
document will be assumed to not be required.  A warning will be logged"),
+
+    FAIL_UNMATCHED_COLUMN("Fail on Unmatched Columns",
+            "A flow will fail if any column in the database that does not have 
a field in the document.  An error will be logged");
+
+
+    private final String displayName;
+    private final String description;
+
+    UnmatchedColumnBehavior(final String displayName, final String 
description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index c8ee7bd171..794d2c5f37 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -21,9 +21,11 @@ import org.apache.commons.lang3.Validate;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
@@ -265,9 +267,16 @@ public class GenericDataConverters {
 
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<RecordField> recordField = 
recordSchema.getField(converter.getSourceFieldName());
-                final RecordField field = recordField.get();
-                // creates a record field accessor for every data converter
-                getters.put(converter.getTargetFieldName(), 
createFieldGetter(field.getDataType(), field.getFieldName(), 
field.isNullable()));
+                if (recordField.isEmpty()) {
+                    final Types.NestedField missingField = 
schema.field(converter.getTargetFieldName());
+                    if (missingField != null) {
+                        getters.put(converter.getTargetFieldName(), 
createFieldGetter(convertSchemaTypeToDataType(missingField.type()), 
missingField.name(), missingField.isOptional()));
+                    }
+                } else {
+                    final RecordField field = recordField.get();
+                    // creates a record field accessor for every data converter
+                    getters.put(converter.getTargetFieldName(), 
createFieldGetter(field.getDataType(), field.getFieldName(), 
field.isNullable()));
+                }
             }
         }
 
@@ -290,4 +299,54 @@ public class GenericDataConverters {
             return converter.convert((S) 
getters.get(converter.getTargetFieldName()).getFieldOrNull(record));
         }
     }
+
+    public static DataType convertSchemaTypeToDataType(Type schemaType) {
+        switch (schemaType.typeId()) {
+            case BOOLEAN:
+                return RecordFieldType.BOOLEAN.getDataType();
+            case INTEGER:
+                return RecordFieldType.INT.getDataType();
+            case LONG:
+                return RecordFieldType.LONG.getDataType();
+            case FLOAT:
+                return RecordFieldType.FLOAT.getDataType();
+            case DOUBLE:
+                return RecordFieldType.DOUBLE.getDataType();
+            case DATE:
+                return RecordFieldType.DATE.getDataType();
+            case TIME:
+                return RecordFieldType.TIME.getDataType();
+            case TIMESTAMP:
+                return RecordFieldType.TIMESTAMP.getDataType();
+            case STRING:
+                return RecordFieldType.STRING.getDataType();
+            case UUID:
+                return RecordFieldType.UUID.getDataType();
+            case FIXED:
+            case BINARY:
+                return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case DECIMAL:
+                return RecordFieldType.DECIMAL.getDataType();
+            case STRUCT:
+                // Build a record type from the struct type
+                Types.StructType structType = schemaType.asStructType();
+                List<Types.NestedField> fields = structType.fields();
+                List<RecordField> recordFields = new 
ArrayList<>(fields.size());
+                for (Types.NestedField field : fields) {
+                    DataType dataType = 
convertSchemaTypeToDataType(field.type());
+                    recordFields.add(new RecordField(field.name(), dataType, 
field.isOptional()));
+                }
+                RecordSchema recordSchema = new 
SimpleRecordSchema(recordFields);
+                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+            case LIST:
+                // Build a list type from the elements
+                Types.ListType listType = schemaType.asListType();
+                return 
RecordFieldType.ARRAY.getArrayDataType(convertSchemaTypeToDataType(listType.elementType()),
 listType.isElementOptional());
+            case MAP:
+                // Build a map type from the elements
+                Types.MapType mapType = schemaType.asMapType();
+                return 
RecordFieldType.MAP.getMapDataType(convertSchemaTypeToDataType(mapType.valueType()),
 mapType.isValueOptional());
+        }
+        throw new IllegalArgumentException("Invalid or unsupported type: " + 
schemaType);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
index 33049123ef..49ff37f475 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
@@ -25,6 +25,8 @@ import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.iceberg.UnmatchedColumnBehavior;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -46,19 +48,26 @@ import java.util.stream.Collectors;
 public class IcebergRecordConverter {
 
     private final DataConverter<Record, GenericRecord> converter;
+    public final UnmatchedColumnBehavior unmatchedColumnBehavior;
+    public ComponentLog logger;
+
     public GenericRecord convert(Record record) {
         return converter.convert(record);
     }
 
+
     @SuppressWarnings("unchecked")
-    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, 
FileFormat fileFormat) {
-        this.converter = (DataConverter<Record, GenericRecord>) 
IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), 
fileFormat);
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, 
FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior, 
ComponentLog logger) {
+        this.converter = (DataConverter<Record, GenericRecord>) 
IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), 
fileFormat, unmatchedColumnBehavior, logger);
+        this.unmatchedColumnBehavior = unmatchedColumnBehavior;
+        this.logger = logger;
     }
 
     private static class IcebergSchemaVisitor extends 
SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
 
-        public static DataConverter<?, ?> visit(Schema schema, RecordDataType 
recordDataType, FileFormat fileFormat) {
-            return visit(schema, new RecordTypeWithFieldNameMapper(schema, 
recordDataType), new IcebergSchemaVisitor(), new 
IcebergPartnerAccessors(schema, fileFormat));
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType 
recordDataType, FileFormat fileFormat, UnmatchedColumnBehavior 
unmatchedColumnBehavior, ComponentLog logger) {
+            return visit(schema, new RecordTypeWithFieldNameMapper(schema, 
recordDataType), new IcebergSchemaVisitor(),
+                    new IcebergPartnerAccessors(schema, fileFormat, 
unmatchedColumnBehavior, logger));
         }
 
         @Override
@@ -123,8 +132,10 @@ public class IcebergRecordConverter {
             // set NiFi schema field names (sourceFieldName) in the data 
converters
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<String> mappedFieldName = 
recordType.getNameMapping(converter.getTargetFieldName());
-                final Optional<RecordField> recordField = 
recordSchema.getField(mappedFieldName.get());
-                converter.setSourceFieldName(recordField.get().getFieldName());
+                if (mappedFieldName.isPresent()) {
+                    final Optional<RecordField> recordField = 
recordSchema.getField(mappedFieldName.get());
+                    
converter.setSourceFieldName(recordField.get().getFieldName());
+                }
             }
 
             return new GenericDataConverters.RecordConverter(converters, 
recordSchema, type);
@@ -144,10 +155,14 @@ public class IcebergRecordConverter {
     public static class IcebergPartnerAccessors implements 
SchemaWithPartnerVisitor.PartnerAccessors<DataType> {
         private final Schema schema;
         private final FileFormat fileFormat;
+        private final UnmatchedColumnBehavior unmatchedColumnBehavior;
+        private final ComponentLog logger;
 
-        IcebergPartnerAccessors(Schema schema, FileFormat fileFormat) {
+        IcebergPartnerAccessors(Schema schema, FileFormat fileFormat, 
UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) {
             this.schema = schema;
             this.fileFormat = fileFormat;
+            this.unmatchedColumnBehavior = unmatchedColumnBehavior;
+            this.logger = logger;
         }
 
         @Override
@@ -156,8 +171,25 @@ public class IcebergRecordConverter {
             final RecordTypeWithFieldNameMapper recordType = 
(RecordTypeWithFieldNameMapper) dataType;
 
             final Optional<String> mappedFieldName = 
recordType.getNameMapping(name);
-            Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot 
find field with name '%s' in the record schema", name));
-
+            if 
(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) 
{
+                Validate.isTrue(mappedFieldName.isPresent(), 
String.format("Cannot find field with name '%s' in the record schema", name));
+            }
+            if (mappedFieldName.isEmpty()) {
+                if 
(UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior))
 {
+                    if (logger != null) {
+                        logger.warn("Cannot find field with name '" + name + 
"' in the record schema, using the target schema for datatype and a null 
value");
+                    }
+                }
+                // If the field is missing, use the expected type from the 
schema (converted to a DataType)
+                final Types.NestedField schemaField = 
schema.findField(fieldId);
+                final Type schemaFieldType = schemaField.type();
+                if (schemaField.isRequired()) {
+                    // Iceberg requires a non-null value for required fields
+                    throw new IllegalArgumentException("Iceberg requires a 
non-null value for required fields, field: "
+                            + schemaField.name() + ", type: " + 
schemaFieldType);
+                }
+                return 
GenericDataConverters.convertSchemaTypeToDataType(schemaFieldType);
+            }
             final Optional<RecordField> recordField = 
recordType.getChildSchema().getField(mappedFieldName.get());
             final RecordField field = recordField.get();
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index 02bd0b074f..22b1ec5507 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -108,6 +108,14 @@ public class PutIceberg extends AbstractIcebergProcessor {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new 
PropertyDescriptor.Builder()
+            .name("unmatched-column-behavior")
+            .displayName("Unmatched Column Behavior")
+            .description("If an incoming record does not have a field mapping 
for all of the database table's columns, this property specifies how to handle 
the situation.")
+            .allowableValues(UnmatchedColumnBehavior.class)
+            
.defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue())
+            .required(true)
+            .build();
     static final PropertyDescriptor FILE_FORMAT = new 
PropertyDescriptor.Builder()
             .name("file-format")
             .displayName("File Format")
@@ -178,6 +186,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
             CATALOG,
             CATALOG_NAMESPACE,
             TABLE_NAME,
+            UNMATCHED_COLUMN_BEHAVIOR,
             FILE_FORMAT,
             MAXIMUM_FILE_SIZE,
             KERBEROS_USER_SERVICE,
@@ -256,8 +265,10 @@ public class PutIceberg extends AbstractIcebergProcessor {
             final FileFormat format = getFileFormat(table.properties(), 
fileFormat);
             final IcebergTaskWriterFactory taskWriterFactory = new 
IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize);
             taskWriter = taskWriterFactory.create();
+            final UnmatchedColumnBehavior unmatchedColumnBehavior =
+                    
UnmatchedColumnBehavior.valueOf(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
 
-            final IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(table.schema(), reader.getSchema(), format);
+            final IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(table.schema(), reader.getSchema(), format, 
unmatchedColumnBehavior, getLogger());
 
             Record record;
             while ((record = reader.nextRecord()) != null) {
@@ -353,5 +364,4 @@ public class PutIceberg extends AbstractIcebergProcessor {
                 .retry(3)
                 .run(file -> table.io().deleteFile(file.path().toString()));
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
index 4e535c3f8a..cc45705eb3 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
@@ -32,6 +32,8 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.types.Types;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
 import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
@@ -79,10 +81,12 @@ public class TestDataFileActions {
     );
 
     private PutIceberg icebergProcessor;
+    private ComponentLog logger;
 
     @BeforeEach
     public void setUp() {
         icebergProcessor = new PutIceberg();
+        logger = new MockComponentLogger();
     }
 
     @DisabledOnOs(WINDOWS)
@@ -103,7 +107,7 @@ public class TestDataFileActions {
         IcebergTaskWriterFactory taskWriterFactory = new 
IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, 
null);
         TaskWriter<Record> taskWriter = taskWriterFactory.create();
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
 
         for (MapRecord record : recordList) {
             taskWriter.write(recordConverter.convert(record));
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index baf220fea2..c064c723f0 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -37,6 +37,8 @@ import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
 import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter;
 import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
 import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter;
@@ -57,6 +59,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
+
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -92,9 +95,12 @@ public class TestIcebergRecordConverter {
 
     private OutputFile tempFile;
 
+    private ComponentLog logger;
+
     @BeforeEach
     public void setUp() throws Exception {
         tempFile = Files.localOutput(createTempFile("test", null));
+        logger = new MockComponentLogger();
     }
 
     @AfterEach
@@ -145,6 +151,24 @@ public class TestIcebergRecordConverter {
             Types.NestedField.optional(14, "choice", Types.IntegerType.get())
     );
 
+    private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new 
Schema(
+            Types.NestedField.optional(0, "string", Types.StringType.get()),
+            Types.NestedField.required(1, "integer", Types.IntegerType.get()),
+            Types.NestedField.required(2, "float", Types.FloatType.get()),
+            Types.NestedField.required(3, "long", Types.LongType.get()),
+            Types.NestedField.optional(4, "double", Types.DoubleType.get()),
+            Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 
2)),
+            Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
+            Types.NestedField.optional(7, "fixed", 
Types.FixedType.ofLength(5)),
+            Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
+            Types.NestedField.optional(9, "date", Types.DateType.get()),
+            Types.NestedField.optional(10, "time", Types.TimeType.get()),
+            Types.NestedField.optional(11, "timestamp", 
Types.TimestampType.withZone()),
+            Types.NestedField.optional(12, "timestampTz", 
Types.TimestampType.withoutZone()),
+            Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
+            Types.NestedField.optional(14, "choice", Types.IntegerType.get())
+    );
+
     private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema(
             Types.NestedField.optional(0, "string", Types.StringType.get()),
             Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
@@ -240,6 +264,24 @@ public class TestIcebergRecordConverter {
         return new SimpleRecordSchema(fields);
     }
 
+    private static RecordSchema getPrimitivesSchemaMissingFields() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("string", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("double", 
RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("decimal", 
RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
+        fields.add(new RecordField("boolean", 
RecordFieldType.BOOLEAN.getDataType()));
+        fields.add(new RecordField("fixed", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        fields.add(new RecordField("binary", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        fields.add(new RecordField("date", 
RecordFieldType.DATE.getDataType()));
+        fields.add(new RecordField("time", 
RecordFieldType.TIME.getDataType()));
+        fields.add(new RecordField("timestamp", 
RecordFieldType.TIMESTAMP.getDataType()));
+        fields.add(new RecordField("timestampTz", 
RecordFieldType.TIMESTAMP.getDataType()));
+        fields.add(new RecordField("uuid", 
RecordFieldType.UUID.getDataType()));
+        fields.add(new RecordField("choice", 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), 
RecordFieldType.INT.getDataType())));
+
+        return new SimpleRecordSchema(fields);
+    }
+
     private static RecordSchema getPrimitivesAsCompatiblesSchema() {
         List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("string", 
RecordFieldType.INT.getDataType()));
@@ -370,6 +412,29 @@ public class TestIcebergRecordConverter {
         return new MapRecord(getPrimitivesSchema(), values);
     }
 
+    private static Record setupPrimitivesTestRecordMissingFields() {
+        LocalDate localDate = LocalDate.of(2017, 4, 4);
+        LocalTime localTime = LocalTime.of(14, 20, 33);
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 
789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, 
ZoneOffset.ofHours(-5));
+
+        Map<String, Object> values = new HashMap<>();
+        values.put("string", "Test String");
+        values.put("double", 3.14159D);
+        values.put("decimal", new BigDecimal("12345678.12"));
+        values.put("boolean", true);
+        values.put("fixed", "hello".getBytes());
+        values.put("binary", "hello".getBytes());
+        values.put("date", localDate);
+        values.put("time", Time.valueOf(localTime));
+        values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
+        values.put("timestampTz", Timestamp.valueOf(localDateTime));
+        values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
+        values.put("choice", "10");
+
+        return new MapRecord(getPrimitivesSchemaMissingFields(), values);
+    }
+
     private static Record setupCompatiblePrimitivesTestRecord() {
 
         Map<String, Object> values = new HashMap<>();
@@ -439,7 +504,7 @@ public class TestIcebergRecordConverter {
         RecordSchema nifiSchema = getPrimitivesSchema();
         Record record = setupPrimitivesTestRecord();
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
         GenericRecord genericRecord = recordConverter.convert(record);
 
         writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
@@ -472,6 +537,54 @@ public class TestIcebergRecordConverter {
         } else {
             assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
         }
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+    public void testPrimitivesIgnoreMissingFields(FileFormat format) throws 
IOException {
+        RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+        Record record = setupPrimitivesTestRecordMissingFields();
+        MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, 
tempFile.toInputFile());
+
+        assertEquals(results.size(), 1);
+        GenericRecord resultRecord = results.get(0);
+
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 
789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, 
ZoneOffset.ofHours(-5));
+
+        assertEquals("Test String", resultRecord.get(0, String.class));
+        assertNull(resultRecord.get(1, Integer.class));
+        assertNull(resultRecord.get(2, Float.class));
+        assertNull(resultRecord.get(3, Long.class));
+        assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
+        assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
+        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(12, LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+
+        if (format.equals(FileFormat.PARQUET)) {
+            // Parquet uses a conversion to the byte values of numeric 
characters such as "0" -> byte value 0
+            UUID uuid = UUID.fromString("0000-00-00-00-000000");
+            ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+            byteBuffer.putLong(uuid.getMostSignificantBits());
+            byteBuffer.putLong(uuid.getLeastSignificantBits());
+            assertArrayEquals(byteBuffer.array(), resultRecord.get(13, 
byte[].class));
+        } else {
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+        }
 
         // Test null values
         for (String fieldName : record.getRawFieldNames()) {
@@ -504,11 +617,81 @@ public class TestIcebergRecordConverter {
     @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testCompatiblePrimitives(FileFormat format) throws IOException 
{
+    public void testPrimitivesMissingRequiredFields(FileFormat format) {
+        RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+        MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+        assertThrows(IllegalArgumentException.class,
+                () -> new 
IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema, 
format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+    public void testPrimitivesWarnMissingFields(FileFormat format) throws 
IOException {
+        RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+        Record record = setupPrimitivesTestRecordMissingFields();
+        MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN, mockComponentLogger);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, 
tempFile.toInputFile());
+
+        assertEquals(results.size(), 1);
+        GenericRecord resultRecord = results.get(0);
+
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 
789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, 
ZoneOffset.ofHours(-5));
+
+        assertEquals("Test String", resultRecord.get(0, String.class));
+        assertNull(resultRecord.get(1, Integer.class));
+        assertNull(resultRecord.get(2, Float.class));
+        assertNull(resultRecord.get(3, Long.class));
+        assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, 
Double.class));
+        assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
+        assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(8, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
+        assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(12, LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+
+        if (format.equals(FileFormat.PARQUET)) {
+            // Parquet uses a conversion to the byte values of numeric 
characters such as "0" -> byte value 0
+            UUID uuid = UUID.fromString("0000-00-00-00-000000");
+            ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+            byteBuffer.putLong(uuid.getMostSignificantBits());
+            byteBuffer.putLong(uuid.getLeastSignificantBits());
+            assertArrayEquals(byteBuffer.array(), resultRecord.get(13, 
byte[].class));
+        } else {
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
+        }
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+    public void testPrimitivesFailMissingFields(FileFormat format) throws 
IOException {
+        RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+        MockComponentLogger mockComponentLogger = new MockComponentLogger();
+
+        assertThrows(IllegalArgumentException.class,
+                () -> new IcebergRecordConverter(PRIMITIVES_SCHEMA, 
nifiSchema, format, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, 
mockComponentLogger));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void testCompatiblePrimitives() throws IOException {
         RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema();
         Record record = setupCompatiblePrimitivesTestRecord();
+        final FileFormat format = PARQUET;
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, logger);
         GenericRecord genericRecord = recordConverter.convert(record);
 
         writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile);
@@ -536,21 +719,17 @@ public class TestIcebergRecordConverter {
         assertEquals(expectedLocalDateTimestamp, resultRecord.get(11, 
LocalDateTime.class));
         assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class));
 
-        if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(12, byte[].class));
-        } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(12, UUID.class));
-        }
+        assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0}, resultRecord.get(12, byte[].class));
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testStruct(FileFormat format) throws IOException {
+    @Test
+    public void testStruct() throws IOException {
         RecordSchema nifiSchema = getStructSchema();
         Record record = setupStructTestRecord();
+        final FileFormat format = FileFormat.ORC;
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
         GenericRecord genericRecord = recordConverter.convert(record);
 
         writeTo(format, STRUCT_SCHEMA, genericRecord, tempFile);
@@ -574,13 +753,13 @@ public class TestIcebergRecordConverter {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testList(FileFormat format) throws IOException {
+    @Test
+    public void testList() throws IOException {
         RecordSchema nifiSchema = getListSchema();
         Record record = setupListTestRecord();
+        final FileFormat format = FileFormat.AVRO;
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
         GenericRecord genericRecord = recordConverter.convert(record);
 
         writeTo(format, LIST_SCHEMA, genericRecord, tempFile);
@@ -593,7 +772,7 @@ public class TestIcebergRecordConverter {
 
         assertEquals(1, resultRecord.size());
         assertInstanceOf(List.class, resultRecord.get(0));
-        List nestedList = resultRecord.get(0, List.class);
+        List<?> nestedList = resultRecord.get(0, List.class);
 
         assertEquals(2, nestedList.size());
         assertInstanceOf(List.class, nestedList.get(0));
@@ -604,13 +783,13 @@ public class TestIcebergRecordConverter {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testMap(FileFormat format) throws IOException {
+    @Test
+    public void testMap() throws IOException {
         RecordSchema nifiSchema = getMapSchema();
         Record record = setupMapTestRecord();
+        final FileFormat format = PARQUET;
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
         GenericRecord genericRecord = recordConverter.convert(record);
 
         writeTo(format, MAP_SCHEMA, genericRecord, tempFile);
@@ -636,20 +815,21 @@ public class TestIcebergRecordConverter {
     @ParameterizedTest
     @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
     public void testSchemaMismatch(FileFormat format) {
-        RecordSchema nifiSchema = getPrimitivesSchema();
+        RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
 
-        IllegalArgumentException e = 
assertThrows(IllegalArgumentException.class, () -> new 
IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format));
-        assertTrue(e.getMessage().contains("Cannot find field with name 
'FIELD1' in the record schema"), e.getMessage());
+        IllegalArgumentException e = 
assertThrows(IllegalArgumentException.class,
+                () -> new 
IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema, 
format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger));
+        assertTrue(e.getMessage().contains("Iceberg requires a non-null value 
for required fields"), e.getMessage());
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testCaseInsensitiveFieldMapping(FileFormat format) throws 
IOException {
+    @Test
+    public void testCaseInsensitiveFieldMapping() throws IOException {
         RecordSchema nifiSchema = getCaseInsensitiveSchema();
         Record record = setupCaseInsensitiveTestRecord();
+        final FileFormat format = FileFormat.AVRO;
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
         GenericRecord genericRecord = recordConverter.convert(record);
 
         writeTo(format, CASE_INSENSITIVE_SCHEMA, genericRecord, tempFile);
@@ -667,13 +847,13 @@ public class TestIcebergRecordConverter {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testUnorderedFieldMapping(FileFormat format) throws 
IOException {
+    @Test
+    public void testUnorderedFieldMapping() throws IOException {
         RecordSchema nifiSchema = getUnorderedSchema();
         Record record = setupUnorderedTestRecord();
+        final FileFormat format = PARQUET;
 
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format);
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format, 
UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
         GenericRecord genericRecord = recordConverter.convert(record);
 
         writeTo(format, UNORDERED_SCHEMA, genericRecord, tempFile);
@@ -698,7 +878,7 @@ public class TestIcebergRecordConverter {
         assertEquals("value5", resultRecord.get(2, String.class));
 
         assertInstanceOf(Map.class, resultRecord.get(3));
-        Map map = resultRecord.get(3, Map.class);
+        Map<?,?> map = resultRecord.get(3, Map.class);
         assertEquals("map value1", map.get("key1"));
         assertEquals("map value2", map.get("key2"));
     }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
index ff8f5a9a3e..49ce684302 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg;
 
 import org.apache.avro.Schema;
 import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -38,9 +39,8 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -102,14 +102,14 @@ public class TestPutIcebergWithHadoopCatalog {
         runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
     }
 
-    private void initCatalog(PartitionSpec spec, String fileFormat) throws 
InitializationException, IOException {
+    private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws 
InitializationException, IOException {
         TestHadoopCatalogService catalogService = new 
TestHadoopCatalogService();
         IcebergCatalogFactory catalogFactory = new 
IcebergCatalogFactory(catalogService);
         catalog = catalogFactory.create();
 
         Map<String, String> tableProperties = new HashMap<>();
         tableProperties.put(TableProperties.FORMAT_VERSION, "2");
-        tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+        tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.name());
 
         catalog.createTable(TABLE_IDENTIFIER, DATE_SCHEMA, spec, 
tableProperties);
 
@@ -120,16 +120,15 @@ public class TestPutIcebergWithHadoopCatalog {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @ValueSource(strings = {"avro", "orc", "parquet"})
-    public void onTriggerYearTransform(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerYearTransform() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
                 .year("date")
                 .build();
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(spec, fileFormat);
+        initCatalog(spec, FileFormat.PARQUET);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
         runner.setProperty(PutIceberg.TABLE_NAME, "date");
         runner.setValidateExpressionUsage(false);
@@ -148,16 +147,15 @@ public class TestPutIcebergWithHadoopCatalog {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @ValueSource(strings = {"avro", "orc", "parquet"})
-    public void onTriggerMonthTransform(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerMonthTransform() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
                 .month("timestampMicros")
                 .build();
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(spec, fileFormat);
+        initCatalog(spec, FileFormat.ORC);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
         runner.setProperty(PutIceberg.TABLE_NAME, "date");
         runner.setValidateExpressionUsage(false);
@@ -177,16 +175,15 @@ public class TestPutIcebergWithHadoopCatalog {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @ValueSource(strings = {"avro", "orc", "parquet"})
-    public void onTriggerDayTransform(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerDayTransform() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
                 .day("timestampMicros")
                 .build();
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(spec, fileFormat);
+        initCatalog(spec, FileFormat.AVRO);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
         runner.setProperty(PutIceberg.TABLE_NAME, "date");
         runner.setValidateExpressionUsage(false);
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index bc159ef470..05d140a829 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg;
 
 import org.apache.avro.Schema;
 import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -44,10 +45,9 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
@@ -122,10 +122,10 @@ public class TestPutIcebergWithHiveCatalog {
         runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
     }
 
-    private void initCatalog(PartitionSpec spec, String fileFormat) throws 
InitializationException {
+    private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws 
InitializationException {
         Map<String, String> tableProperties = new HashMap<>();
         tableProperties.put(TableProperties.FORMAT_VERSION, "2");
-        tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
+        tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.name());
 
         TestHiveCatalogService catalogService = new 
TestHiveCatalogService.Builder()
                 .withMetastoreUri(metastore.getThriftConnectionUri())
@@ -143,16 +143,15 @@ public class TestPutIcebergWithHiveCatalog {
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro"})
-    public void onTriggerPartitioned(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .bucket("department", 3)
                 .build();
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(spec, fileFormat);
+        initCatalog(spec, FileFormat.AVRO);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
         runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.setValidateExpressionUsage(false);
@@ -181,16 +180,15 @@ public class TestPutIcebergWithHiveCatalog {
         assertProvenanceEvents();
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"orc"})
-    public void onTriggerIdentityPartitioned(String fileFormat) throws 
Exception {
+    @Test
+    public void onTriggerIdentityPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .identity("department")
                 .build();
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(spec, fileFormat);
+        initCatalog(spec, FileFormat.ORC);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
         runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.setValidateExpressionUsage(false);
@@ -219,9 +217,8 @@ public class TestPutIcebergWithHiveCatalog {
         assertProvenanceEvents();
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"parquet"})
-    public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) 
throws Exception {
+    @Test
+    public void onTriggerMultiLevelIdentityPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .identity("name")
                 .identity("department")
@@ -229,7 +226,7 @@ public class TestPutIcebergWithHiveCatalog {
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(spec, fileFormat);
+        initCatalog(spec, FileFormat.PARQUET);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
         runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.setValidateExpressionUsage(false);
@@ -262,12 +259,11 @@ public class TestPutIcebergWithHiveCatalog {
         assertProvenanceEvents();
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro"})
-    public void onTriggerUnPartitioned(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerUnPartitioned() throws Exception {
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(PartitionSpec.unpartitioned(), fileFormat);
+        initCatalog(PartitionSpec.unpartitioned(), FileFormat.AVRO);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}");
         runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}");
         runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");

Reply via email to