nandorsoma commented on code in PR #6368:
URL: https://github.com/apache/nifi/pull/6368#discussion_r967896686


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && 
validationContext.getProperty(CATALOG_NAMESPACE).isSet() && 
validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new 
ValidationResult.Builder().explanation("The provided table has incompatible 
table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new 
IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the 
schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, 
firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), 
flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {

Review Comment:
   I'm wondering, are we sure that we want to transfer the FlowFile only on 
checked exceptions? For example, commit doesn't declare a checked exception, 
but I can imagine that it can easily throw a runtime exception. If we don't 
yield the processor on IOException, we probably don't want to do it either in 
that case.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {

Review Comment:
   @turcsanyip already mentioned that this part would fit in the verify method. 
What I would add that now the validation message contains an exception when the 
table is not an iceberg table:
   ```
   Component is invalid: 'Component' is invalid because Failed to perform 
validation due to org.apache.iceberg.exceptions.NoSuchIcebergTableException: 
Not an iceberg table: hive-catalog.nifitest.icebergtable (type=null)
   ``` 
   I'm not sure. Is it ok to leak exceptions during validation? Or should we 
use predefined error messages? Usually, it is better, but this would also mean 
that a generic message should be used as a fallback which wouldn't be helpful 
at all.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/appender/avro/AvroWithNifiSchemaVisitor.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.appender.avro;
+
+import com.google.common.base.Preconditions;
+import org.apache.iceberg.avro.AvroWithPartnerByStructureVisitor;
+import org.apache.iceberg.util.Pair;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+/**
+ * This class contains Avro specific visitor methods to traverse schema and 
build value writer list for data types.
+ */
+public class AvroWithNifiSchemaVisitor<T> extends 
AvroWithPartnerByStructureVisitor<DataType, T> {
+
+    @Override
+    protected boolean isStringType(DataType dataType) {
+        return dataType.getFieldType().equals(RecordFieldType.STRING);
+    }
+
+    @Override
+    protected boolean isMapType(DataType dataType) {
+        return dataType instanceof MapDataType;
+    }
+
+    @Override
+    protected DataType arrayElementType(DataType arrayType) {
+        Preconditions.checkArgument(arrayType instanceof ArrayDataType, 
"Invalid array: %s is not an array", arrayType);

Review Comment:
   Afaik in NiFi, we aim to avoid Guava. I just checked; we either exclude or 
use the provided version in most dependencies. What makes things more 
complicated here is that it seems like the hive-exec-3.1.3 library shades it. I 
think we can't get rid of it, but at least I think we shouldn't use it in plain 
NiFi code.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
+import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+
+public class TestPutIcebergWithHadoopCatalog {
+
+    private TestRunner runner;
+    private PutIceberg processor;
+    private Schema inputSchema;
+
+    public static Namespace namespace = Namespace.of("test_metastore");
+
+    public static TableIdentifier tableIdentifier = 
TableIdentifier.of(namespace, "date");
+
+    public static org.apache.iceberg.Schema DATE_SCHEMA = new 
org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "timeMicros", Types.TimeType.get()),
+            Types.NestedField.required(2, "timestampMicros", 
Types.TimestampType.withZone()),
+            Types.NestedField.required(3, "date", Types.DateType.get())
+    );
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        final String avroSchema = 
IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")),
 StandardCharsets.UTF_8);
+        inputSchema = new Schema.Parser().parse(avroSchema);
+
+        processor = new PutIceberg();
+    }
+
+    private void initRecordReader() throws InitializationException {
+        MockRecordParser readerFactory = new MockRecordParser();

Review Comment:
   Afaik final is not mandatory in tests, but I see that sometimes you use it, 
sometimes not. Could you declare it consistently?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && 
validationContext.getProperty(CATALOG_NAMESPACE).isSet() && 
validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new 
ValidationResult.Builder().explanation("The provided table has incompatible 
table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new 
IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the 
schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, 
firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), 
flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();
+                fileCommitter.commit(result);
+            }
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            getLogger().error("Exception occurred while writing iceberg 
records. Removing uncommitted data files.", e);
+            try {
+                if (taskWriterFactory != null) {
+                    taskWriter.abort();
+                }
+            } catch (IOException ex) {
+                throw new ProcessException("Failed to abort uncommitted data 
files.", ex);
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, 
String.valueOf(recordCount));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+
+    private Table initializeTable(PropertyContext context) {
+        final IcebergCatalogService catalogService = 
context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        final String catalogNamespace = 
context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+
+        final Catalog catalog = catalogService.getCatalog();
+
+        final Namespace namespace = Namespace.of(catalogNamespace);
+        final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, 
tableName);
+
+        return catalog.loadTable(tableIdentifier);

Review Comment:
   If I understand correctly on every trigger there are 2 network interactions 
with the db. If I wanted to send all data to the same table, it would make 
sense to cache the returned `Table`. Is it possible? Does it make sense?



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/KerberosAwareBaseProcessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosLoginException;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A base processor class for Kerberos aware usage.
+ */
+public abstract class KerberosAwareBaseProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that 
should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
+    private volatile KerberosUser kerberosUser;
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws 
IOException {
+        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            this.kerberosUser = kerberosUserService.createKerberosUser();
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (kerberosUser != null) {
+            try {
+                kerberosUser.logout();
+                kerberosUser = null;
+            } catch (final KerberosLoginException e) {
+                getLogger().debug("Error logging out keytab user", e);

Review Comment:
   I'm not a Kerberos expert but what I see from Kerberos code across NiFi is 
that when logout fails, we either rethrow the exception as a ProcessException 
or at least clear the reference to the kerberosUser in a finally block. 
Probably I prefer the latter one.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
+@CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. The processor supports only V2 table format." +
+        "The incoming data sets are parsed with Record Reader Controller 
Service and ingested into an Iceberg table using the configured catalog service 
and provided table information." +
+        "It is important that the incoming records and the Iceberg table must 
have matching schemas and the target Iceberg table should already exist. " +
+        "The file format of the written data files are defined in the 
'write.format.default' table property, if it is not provided then parquet 
format will be used." +
+        "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile")
+})
+public class PutIceberg extends KerberosAwareBaseProcessor {
+
+    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("catalog-service")
+            .displayName("Catalog Service")
+            .description("Specifies the Controller Service to use for handling 
references to table’s metadata files.")
+            .identifiesControllerService(IcebergCatalogService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("catalog-namespace")
+            .displayName("Catalog namespace")
+            .description("The namespace of the catalog.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Iceberg table name")
+            .description("The name of the Iceberg table.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
data ingestion failed and retrying the operation will also fail, "
+                    + "such as an invalid data or schema.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            RECORD_READER,
+            CATALOG,
+            TABLE_NAME,
+            CATALOG_NAMESPACE,
+            KERBEROS_USER_SERVICE
+    ));
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> validationResults = new HashSet<>();
+        if (validationContext.getProperty(TABLE_NAME).isSet() && 
validationContext.getProperty(CATALOG_NAMESPACE).isSet() && 
validationContext.getProperty(CATALOG).isSet()) {
+            final Table table = initializeTable(validationContext);
+
+            if (!validateTableVersion(table)) {
+                validationResults.add(new 
ValidationResult.Builder().explanation("The provided table has incompatible 
table format. V1 table format is not supported.").build());
+            }
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void doOnTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Table table = initializeTable(context);
+
+        final IcebergFileCommitter fileCommitter = new 
IcebergFileCommitter(table);
+
+        IcebergTaskWriterFactory taskWriterFactory = null;
+        TaskWriter<Record> taskWriter = null;
+        int recordCount = 0;
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            //The first record is needed from the incoming set to get the 
schema and initialize the task writer.
+            Record firstRecord = reader.nextRecord();
+            if (firstRecord != null) {
+                taskWriterFactory = new IcebergTaskWriterFactory(table, 
firstRecord.getSchema());
+                taskWriterFactory.initialize(table.spec().specId(), 
flowFile.getId());
+                taskWriter = taskWriterFactory.create();
+
+                taskWriter.write(firstRecord);
+                recordCount++;
+
+                //Process the remaining records
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    taskWriter.write(record);
+                    recordCount++;
+                }
+
+                WriteResult result = taskWriter.complete();

Review Comment:
   This can be final.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/NifiRecordWrapper.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.StructLike;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+
+/**
+ * Class to wrap and adapt {@link Record} to Iceberg {@link StructLike} for 
partition handling usage like {@link
+ * org.apache.iceberg.PartitionKey#partition(StructLike)}
+ */
+public class NifiRecordWrapper implements StructLike {
+
+    private final DataType[] types;
+
+    private final BiFunction<Record, Integer, ?>[] getters;
+
+    private Record record = null;
+
+    @SuppressWarnings("unchecked")
+    public NifiRecordWrapper(RecordSchema schema) {
+        this.types = schema.getDataTypes().toArray(new DataType[0]);
+        this.getters = 
Stream.of(types).map(NifiRecordWrapper::getter).toArray(BiFunction[]::new);
+    }
+
+    public NifiRecordWrapper wrap(Record record) {
+        this.record = record;
+        return this;
+    }
+
+    @Override
+    public int size() {
+        return types.length;
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+        if (record.getSchema().getField(pos) == null) {
+            return null;
+        } else if (getters[pos] != null) {
+            return javaClass.cast(getters[pos].apply(record, pos));
+        }
+
+        return 
javaClass.cast(record.getValue(record.getSchema().getField(pos)));
+    }
+
+    @Override
+    public <T> void set(int i, T t) {
+        throw new UnsupportedOperationException("Record wrapper update is 
unsupported.");
+    }
+
+    private static BiFunction<Record, Integer, ?> getter(DataType type) {
+        if (type.equals(RecordFieldType.TIMESTAMP.getDataType())) {
+            return (row, pos) -> {
+                final RecordField field = row.getSchema().getField(pos);
+                final Object value = row.getValue(field);
+
+                Timestamp timestamp = DataTypeUtils.toTimestamp(value, () -> 
DataTypeUtils.getDateFormat(type.getFormat()), field.getFieldName());

Review Comment:
   Missing a few finals in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to