nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r982188863


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+
+/**
+ * Base Iceberg processor class.
+ */
+public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that 
should be used for authenticating with Kerberos.")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    private Configuration configuration;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) {
+        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        final IcebergCatalogService catalogService = 
context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+
+        if (catalogService != null) {
+            this.configuration = catalogService.getConfiguration();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;

Review Comment:
   I am not sure, but I think we should assign null to the kerberosUser in a 
finally block or we should rethrow the KerberosLoginException.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between 
NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+    private final DataConverter<Record, GenericRecord> converter;
+
+    public GenericRecord convert(Record record) {
+        return converter.convert(record);
+    }
+
+    @SuppressWarnings("unchecked")
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, 
FileFormat fileFormat) {
+        this.converter = (DataConverter<Record, GenericRecord>) 
IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), 
fileFormat);
+    }
+
+    private static class IcebergSchemaVisitor extends 
SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType 
recordDataType, FileFormat fileFormat) {
+            return visit(schema, recordDataType, new IcebergSchemaVisitor(), 
new IcebergPartnerAccessors(fileFormat));
+        }
+
+        @Override
+        public DataConverter<?, ?> schema(Schema schema, DataType dataType, 
DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> field(Types.NestedField field, DataType 
dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType 
dataType) {
+            if (type.typeId() != null) {
+                switch (type.typeId()) {
+                    case BOOLEAN:
+                    case INTEGER:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case DATE:
+                    case STRING:
+                        return 
GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case TIME:
+                        return GenericDataConverters.TimeConverter.INSTANCE;
+                    case TIMESTAMP:
+                        Types.TimestampType timestampType = 
(Types.TimestampType) type;

Review Comment:
   Missing finals.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {

Review Comment:
   Probably I'm blind, but I don't see where this logic is moved. Don't we need 
this validation anymore?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+
+public class TestPutIcebergWithHadoopCatalog {
+
+    private TestRunner runner;
+    private PutIceberg processor;
+    private Schema inputSchema;
+
+    public static Namespace namespace = Namespace.of("test_metastore");
+
+    public static TableIdentifier tableIdentifier = 
TableIdentifier.of(namespace, "date");
+
+    public static org.apache.iceberg.Schema DATE_SCHEMA = new 
org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "timeMicros", Types.TimeType.get()),
+            Types.NestedField.required(2, "timestampMicros", 
Types.TimestampType.withZone()),
+            Types.NestedField.required(3, "date", Types.DateType.get())
+    );
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        final String avroSchema = 
IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")),
 StandardCharsets.UTF_8);
+        inputSchema = new Schema.Parser().parse(avroSchema);
+
+        processor = new PutIceberg();
+    }
+
+    private void initRecordReader() throws InitializationException {
+        MockRecordParser readerFactory = new MockRecordParser();

Review Comment:
   Question still stands, not mandatory, just write a -1 if you don't agree.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && 
validationContext.getProperty(CATALOG_NAMESPACE).isSet() && 
validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new 
ValidationResult.Builder().explanation("The provided table has incompatible 
table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new 
IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the 
schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, 
firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), 
flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {

Review Comment:
   thx



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/KerberosAwareBaseProcessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A base processor class for Kerberos aware usage.
+ */
+public abstract class KerberosAwareBaseProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that 
should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws 
IOException {
+        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (final KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);

Review Comment:
   I think this problem still stands it is just moved to 
AbstractIceberProcessor. What do you think?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+
+/**
+ * Base Iceberg processor class.
+ */
+public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that 
should be used for authenticating with Kerberos.")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    private Configuration configuration;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) {
+        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        final IcebergCatalogService catalogService = 
context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+
+        if (catalogService != null) {
+            this.configuration = catalogService.getConfiguration();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final KerberosUser kerberosUser = getKerberosUser();
+        if (kerberosUser == null) {
+            doOnTrigger(context, session);
+        } else {
+            try {
+                final UserGroupInformation ugi = 
getUgiForKerberosUser(configuration, kerberosUser);
+
+                ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+                    doOnTrigger(context, session);
+                    return null;
+                });
+
+            } catch (Exception e) {
+                throw new ProcessException(e);

Review Comment:
   In general exceptions should include a basic message instead of just 
wrapping the cause.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.iceberg.converter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class is responsible for schema traversal and data conversion between 
NiFi and Iceberg internal record structure.
+ */
+public class IcebergRecordConverter {
+
+    private final DataConverter<Record, GenericRecord> converter;
+
+    public GenericRecord convert(Record record) {
+        return converter.convert(record);
+    }
+
+    @SuppressWarnings("unchecked")
+    public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, 
FileFormat fileFormat) {
+        this.converter = (DataConverter<Record, GenericRecord>) 
IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), 
fileFormat);
+    }
+
+    private static class IcebergSchemaVisitor extends 
SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
+
+        public static DataConverter<?, ?> visit(Schema schema, RecordDataType 
recordDataType, FileFormat fileFormat) {
+            return visit(schema, recordDataType, new IcebergSchemaVisitor(), 
new IcebergPartnerAccessors(fileFormat));
+        }
+
+        @Override
+        public DataConverter<?, ?> schema(Schema schema, DataType dataType, 
DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> field(Types.NestedField field, DataType 
dataType, DataConverter<?, ?> converter) {
+            return converter;
+        }
+
+        @Override
+        public DataConverter<?, ?> primitive(Type.PrimitiveType type, DataType 
dataType) {
+            if (type.typeId() != null) {
+                switch (type.typeId()) {
+                    case BOOLEAN:
+                    case INTEGER:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case DATE:
+                    case STRING:
+                        return 
GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case TIME:
+                        return GenericDataConverters.TimeConverter.INSTANCE;
+                    case TIMESTAMP:
+                        Types.TimestampType timestampType = 
(Types.TimestampType) type;
+                        if (timestampType.shouldAdjustToUTC()) {
+                            return 
GenericDataConverters.TimestampWithTimezoneConverter.INSTANCE;
+                        }
+                        return 
GenericDataConverters.TimestampConverter.INSTANCE;
+                    case UUID:
+                        UUIDDataType uuidType = (UUIDDataType) dataType;
+                        if (uuidType.getFileFormat() == FileFormat.PARQUET) {
+                            return 
GenericDataConverters.UUIDtoByteArrayConverter.INSTANCE;
+                        }
+                        return 
GenericDataConverters.SameTypeConverter.INSTANCE;
+                    case FIXED:
+                        Types.FixedType fixedType = (Types.FixedType) type;
+                        return new 
GenericDataConverters.FixedConverter(fixedType.length());
+                    case BINARY:
+                        return GenericDataConverters.BinaryConverter.INSTANCE;
+                    case DECIMAL:
+                        Types.DecimalType decimalType = (Types.DecimalType) 
type;
+                        return new 
GenericDataConverters.BigDecimalConverter(decimalType.precision(), 
decimalType.scale());
+                    default:
+                        throw new UnsupportedOperationException("Unsupported 
type: " + type.typeId());
+                }
+            }
+            return null;

Review Comment:
   Is it a valid scenario to return null here? If we cannot support an unknown 
type, how can we support a null type?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && 
validationContext.getProperty(CATALOG_NAMESPACE).isSet() && 
validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new 
ValidationResult.Builder().explanation("The provided table has incompatible 
table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new 
IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the 
schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, 
firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), 
flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();

Review Comment:
   thx



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && 
validationContext.getProperty(CATALOG_NAMESPACE).isSet() && 
validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new 
ValidationResult.Builder().explanation("The provided table has incompatible 
table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new 
IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the 
schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, 
firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), 
flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            getLogger().error("Exception occurred while writing iceberg 
records. Removing uncommitted data files.", e);
+            try {
+                if (taskWriterFactory != null) {
+                    taskWriter.abort();
+                }
+            } catch (IOException ex) {
+                throw new ProcessException("Failed to abort uncommitted data 
files.", ex);
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, 
String.valueOf(recordCount));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+
+    private Table initializeTable(PropertyContext context) {
+        final IcebergCatalogService catalogService = 
context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final String catalogNamespace = 
context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+
+        final Catalog catalog = catalogService.getCatalog();
+
+        final Namespace namespace = Namespace.of(catalogNamespace);
+        final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, 
tableName);
+
+        return catalog.loadTable(tableIdentifier);

Review Comment:
   I think this question still stands but even if you agree I don't think we 
should to that in this pr.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/appender/avro/AvroWithNifiSchemaVisitor.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.appender.avro;
+
+import com.google.common.base.Preconditions;
+import org.apache.iceberg.avro.AvroWithPartnerByStructureVisitor;
+import org.apache.iceberg.util.Pair;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+/**
+ * This class contains Avro specific visitor methods to traverse schema and 
build value writer list for data types.
+ */
+public class AvroWithNifiSchemaVisitor<T> extends 
AvroWithPartnerByStructureVisitor<DataType, T> {
+
+    @Override
+    protected boolean isStringType(DataType dataType) {
+        return dataType.getFieldType().equals(RecordFieldType.STRING);
+    }
+
+    @Override
+    protected boolean isMapType(DataType dataType) {
+        return dataType instanceof MapDataType;
+    }
+
+    @Override
+    protected DataType arrayElementType(DataType arrayType) {
+        Preconditions.checkArgument(arrayType instanceof ArrayDataType, 
"Invalid array: %s is not an array", arrayType);

Review Comment:
   thx for excluding it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to