This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 67767b6550 NIFI-12130: Ability to configure snapshot properties via 
dynamic attributes in PutIceberg
67767b6550 is described below

commit 67767b6550ccce6dceae7aeb61b6913bbd2afb34
Author: Mark Bathori <[email protected]>
AuthorDate: Fri Oct 6 16:02:55 2023 +0200

    NIFI-12130: Ability to configure snapshot properties via dynamic attributes 
in PutIceberg
    
    Fix dynamic field expression language handling
    
    Signed-off-by: Matt Burgess <[email protected]>
---
 .../nifi/processors/iceberg/IcebergUtils.java      | 26 +++++++++++
 .../apache/nifi/processors/iceberg/PutIceberg.java | 52 +++++++++++++++++++++-
 .../additionalDetails.html                         |  7 +++
 .../iceberg/TestPutIcebergCustomValidation.java    | 24 ++++++++++
 .../iceberg/TestPutIcebergWithHiveCatalog.java     | 33 +++++++++-----
 5 files changed, 130 insertions(+), 12 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
index 7a3db7de71..eead5ed61d 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
@@ -17,10 +17,15 @@
  */
 package org.apache.nifi.processors.iceberg;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
 
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 public class IcebergUtils {
 
@@ -39,4 +44,25 @@ public class IcebergUtils {
         }
         return conf;
     }
+
+    /**
+     * Collects every non-blank dynamic property from the context.
+     *
+     * @param context  process context
+     * @param flowFile FlowFile to evaluate attribute expressions
+     * @return Map of dynamic properties
+     */
+    public static Map<String, String> getDynamicProperties(ProcessContext 
context, FlowFile flowFile) {
+        return context.getProperties().entrySet().stream()
+                // filter non-blank dynamic properties
+                .filter(e -> e.getKey().isDynamic()
+                        && StringUtils.isNotBlank(e.getValue())
+                        && 
StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue())
+                )
+                // convert to Map keys and evaluated property values
+                .collect(Collectors.toMap(
+                        e -> e.getKey().getName(),
+                        e -> 
context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue()
+                ));
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index 22b1ec5507..f94453789f 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.util.Tasks;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -39,6 +40,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
+import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties;
 
 @Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
 @CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. " +
@@ -75,12 +78,19 @@ import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFr
         "The target Iceberg table should already exist and it must have 
matching schemas with the incoming records, " +
         "which means the Record Reader schema must contain all the Iceberg 
schema fields, every additional field which is not present in the Iceberg 
schema will be ignored. " +
         "To avoid 'small file problem' it is recommended pre-appending a 
MergeRecord processor.")
+@DynamicProperty(
+        name = "A custom key to add to the snapshot summary. The value must 
start with 'snapshot-property.' prefix.",
+        value = "A custom value to add to the snapshot summary.",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds an entry with custom-key and corresponding value 
in the snapshot summary. The key format must be 
'snapshot-property.custom-key'.")
 @WritesAttributes({
         @WritesAttribute(attribute = "iceberg.record.count", description = 
"The number of records in the FlowFile.")
 })
 public class PutIceberg extends AbstractIcebergProcessor {
 
     public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+    public static final String ICEBERG_SNAPSHOT_SUMMARY_PREFIX = 
"snapshot-property.";
+    public static final String ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID = 
"nifi-flowfile-uuid";
 
     static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
             .name("record-reader")
@@ -116,6 +126,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
             
.defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue())
             .required(true)
             .build();
+
     static final PropertyDescriptor FILE_FORMAT = new 
PropertyDescriptor.Builder()
             .name("file-format")
             .displayName("File Format")
@@ -211,6 +222,25 @@ public class PutIceberg extends AbstractIcebergProcessor {
         return RELATIONSHIPS;
     }
 
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator((subject, input, context) -> {
+                    ValidationResult.Builder builder = new 
ValidationResult.Builder().subject(subject).input(input);
+                    if (subject.startsWith(ICEBERG_SNAPSHOT_SUMMARY_PREFIX)) {
+                        builder.valid(true);
+                    } else {
+                        builder.valid(false).explanation("Dynamic property key 
must begin with '" + ICEBERG_SNAPSHOT_SUMMARY_PREFIX + "'");
+                    }
+                    return builder.build();
+                })
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
         final List<ValidationResult> problems = new ArrayList<>();
@@ -322,8 +352,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
      * Appends the pending data files to the given {@link Table}.
      *
      * @param context processor context
-     * @param table  table to append
-     * @param result datafiles created by the {@link TaskWriter}
+     * @param table   table to append
+     * @param result  datafiles created by the {@link TaskWriter}
      */
     void appendDataFiles(ProcessContext context, FlowFile flowFile, Table 
table, WriteResult result) {
         final int numberOfCommitRetries = 
context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions(flowFile).asInteger();
@@ -334,6 +364,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
         final AppendFiles appender = table.newAppend();
         Arrays.stream(result.dataFiles()).forEach(appender::appendFile);
 
+        addSnapshotSummaryProperties(context, appender, flowFile);
+
         Tasks.foreach(appender)
                 .exponentialBackoff(minimumCommitWaitTime, 
maximumCommitWaitTime, maximumCommitDuration, 2.0)
                 .retry(numberOfCommitRetries)
@@ -341,6 +373,22 @@ public class PutIceberg extends AbstractIcebergProcessor {
                 .run(PendingUpdate::commit);
     }
 
+    /**
+     * Adds the FlowFile's uuid and additional entries provided in Dynamic 
properties to the snapshot summary.
+     *
+     * @param context  processor context
+     * @param appender table appender to set the snapshot summaries
+     * @param flowFile the FlowFile to get the uuid from
+     */
+    private void addSnapshotSummaryProperties(ProcessContext context, 
AppendFiles appender, FlowFile flowFile) {
+        appender.set(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID, 
flowFile.getAttribute(CoreAttributes.UUID.key()));
+
+        for (Map.Entry<String, String> dynamicProperty : 
getDynamicProperties(context, flowFile).entrySet()) {
+            String key = 
dynamicProperty.getKey().substring(ICEBERG_SNAPSHOT_SUMMARY_PREFIX.length());
+            appender.set(key, dynamicProperty.getValue());
+        }
+    }
+
     /**
      * Determines the write file format from the requested value and the table 
configuration.
      *
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
index 06844910f4..71f471ec5a 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
@@ -54,5 +54,12 @@
             The NiFi side retry logic is built on top of the Iceberg commit 
retry logic which can be configured through table properties. See more: <a 
href="https://iceberg.apache.org/docs/latest/configuration/#table-behavior-properties";>Table
 behavior properties</a>
         </p>
 
+        <h3>Snapshot summary properties</h3>
+        <p>
+            The processor provides an option to add additional properties to 
the snapshot summary using dynamic properties.
+            The additional property must have the 'snapshot-property.' prefix 
in the dynamic property key but the actual entry will be inserted without it.
+            Each snapshot automatically gets the FlowFile's uuid in the 
'nifi-flowfile-uuid' summary property.
+        </p>
+
     </body>
 </html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
index 36f14cedb8..d6cf5bb91a 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
@@ -120,4 +120,28 @@ public class TestPutIcebergCustomValidation {
         runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.assertNotValid();
     }
+
+    @Test
+    public void testInvalidSnapshotSummaryDynamicProperty() throws 
InitializationException {
+        initRecordReader();
+        
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
+
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
+        runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
+
+        runner.setProperty("invalid.dynamic.property", "test value");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testValidSnapshotSummaryDynamicProperty() throws 
InitializationException {
+        initRecordReader();
+        
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
+
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
+        runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
+
+        runner.setProperty("snapshot-property.valid-property", "test value");
+        runner.assertValid();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index 05d140a829..0a0cd8ef96 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -43,7 +43,6 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -54,11 +53,13 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static 
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID;
 import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
 import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
 import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
@@ -171,8 +172,8 @@ public class TestPutIcebergWithHiveCatalog {
         MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
 
         String tableLocation = new URI(table.location()).getPath();
-        Assertions.assertTrue(table.spec().isPartitioned());
-        Assertions.assertEquals("4", 
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        assertTrue(table.spec().isPartitioned());
+        assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
         validateData(table, expectedRecords, 0);
         validateNumberOfDataFiles(tableLocation, 3);
         validatePartitionFolders(tableLocation, Arrays.asList(
@@ -208,8 +209,8 @@ public class TestPutIcebergWithHiveCatalog {
         MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
 
         String tableLocation = new URI(table.location()).getPath();
-        Assertions.assertTrue(table.spec().isPartitioned());
-        Assertions.assertEquals("4", 
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        assertTrue(table.spec().isPartitioned());
+        assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
         validateData(table, expectedRecords, 0);
         validateNumberOfDataFiles(tableLocation, 3);
         validatePartitionFolders(tableLocation, Arrays.asList(
@@ -246,8 +247,8 @@ public class TestPutIcebergWithHiveCatalog {
         MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
 
         String tableLocation = new URI(table.location()).getPath();
-        Assertions.assertTrue(table.spec().isPartitioned());
-        Assertions.assertEquals("4", 
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        assertTrue(table.spec().isPartitioned());
+        assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
         validateData(table, expectedRecords, 0);
         validateNumberOfDataFiles(tableLocation, 4);
         validatePartitionFolders(tableLocation, Arrays.asList(
@@ -267,7 +268,8 @@ public class TestPutIcebergWithHiveCatalog {
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}");
         runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}");
         runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");
-        Map<String,String> attributes = new HashMap<>();
+        runner.setProperty("snapshot-property.additional-summary-property", 
"test summary property");
+        Map<String, String> attributes = new HashMap<>();
         attributes.put("catalog.name", CATALOG_NAME);
         attributes.put("table.name", TABLE_NAME);
         attributes.put("max.filesize", "536870912"); // 512 MB
@@ -286,11 +288,12 @@ public class TestPutIcebergWithHiveCatalog {
         runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
         MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
 
-        Assertions.assertTrue(table.spec().isUnpartitioned());
-        Assertions.assertEquals("4", 
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+        assertTrue(table.spec().isUnpartitioned());
+        assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
         validateData(table, expectedRecords, 0);
         validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
         assertProvenanceEvents();
+        assertSnapshotSummaryProperties(table, 
Collections.singletonMap("additional-summary-property", "test summary 
property"));
     }
 
     private void assertProvenanceEvents() {
@@ -300,4 +303,14 @@ public class TestPutIcebergWithHiveCatalog {
         assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
         assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + 
TABLE_NAME));
     }
+
+    private void assertSnapshotSummaryProperties(Table table, Map<String, 
String> summaryProperties) {
+        Map<String, String> snapshotSummary = 
table.currentSnapshot().summary();
+
+        
assertTrue(snapshotSummary.containsKey(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID));
+
+        for (Map.Entry<String, String> entry : summaryProperties.entrySet()) {
+            assertEquals(snapshotSummary.get(entry.getKey()), 
entry.getValue());
+        }
+    }
 }

Reply via email to