This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4fa0299f8b NIFI-12130: Ability to configure snapshot properties via
dynamic attributes in PutIceberg
4fa0299f8b is described below
commit 4fa0299f8bf94d32c32f433d681d5db00d637aa9
Author: Mark Bathori <[email protected]>
AuthorDate: Fri Oct 6 16:02:55 2023 +0200
NIFI-12130: Ability to configure snapshot properties via dynamic attributes
in PutIceberg
Fix dynamic field expression language handling
Signed-off-by: Matt Burgess <[email protected]>
This closes #7849
---
.../nifi/processors/iceberg/IcebergUtils.java | 26 +++++++++++
.../apache/nifi/processors/iceberg/PutIceberg.java | 52 +++++++++++++++++++++-
.../additionalDetails.html | 7 +++
.../iceberg/TestPutIcebergCustomValidation.java | 24 ++++++++++
.../iceberg/TestPutIcebergWithHiveCatalog.java | 33 +++++++++-----
5 files changed, 130 insertions(+), 12 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
index 7a3db7de71..eead5ed61d 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
@@ -17,10 +17,15 @@
*/
package org.apache.nifi.processors.iceberg;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
public class IcebergUtils {
@@ -39,4 +44,25 @@ public class IcebergUtils {
}
return conf;
}
+
+ /**
+ * Collects every non-blank dynamic property from the context.
+ *
+ * @param context process context
+ * @param flowFile FlowFile to evaluate attribute expressions
+ * @return Map of dynamic properties
+ */
+ public static Map<String, String> getDynamicProperties(ProcessContext
context, FlowFile flowFile) {
+ return context.getProperties().entrySet().stream()
+ // filter non-blank dynamic properties
+ .filter(e -> e.getKey().isDynamic()
+ && StringUtils.isNotBlank(e.getValue())
+ &&
StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue())
+ )
+ // convert to Map keys and evaluated property values
+ .collect(Collectors.toMap(
+ e -> e.getKey().getName(),
+ e ->
context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue()
+ ));
+ }
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index 22b1ec5507..f94453789f 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.Tasks;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -39,6 +40,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
+import static
org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties;
@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc",
"parquet", "avro"})
@CapabilityDescription("This processor uses Iceberg API to parse and load
records into Iceberg tables. " +
@@ -75,12 +78,19 @@ import static
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFr
"The target Iceberg table should already exist and it must have
matching schemas with the incoming records, " +
"which means the Record Reader schema must contain all the Iceberg
schema fields, every additional field which is not present in the Iceberg
schema will be ignored. " +
"To avoid 'small file problem' it is recommended pre-appending a
MergeRecord processor.")
+@DynamicProperty(
+ name = "A custom key to add to the snapshot summary. The value must
start with 'snapshot-property.' prefix.",
+ value = "A custom value to add to the snapshot summary.",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Adds an entry with custom-key and corresponding value
in the snapshot summary. The key format must be
'snapshot-property.custom-key'.")
@WritesAttributes({
@WritesAttribute(attribute = "iceberg.record.count", description =
"The number of records in the FlowFile.")
})
public class PutIceberg extends AbstractIcebergProcessor {
public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
+ public static final String ICEBERG_SNAPSHOT_SUMMARY_PREFIX =
"snapshot-property.";
+ public static final String ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID =
"nifi-flowfile-uuid";
static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
.name("record-reader")
@@ -116,6 +126,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue())
.required(true)
.build();
+
static final PropertyDescriptor FILE_FORMAT = new
PropertyDescriptor.Builder()
.name("file-format")
.displayName("File Format")
@@ -211,6 +222,25 @@ public class PutIceberg extends AbstractIcebergProcessor {
return RELATIONSHIPS;
}
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator((subject, input, context) -> {
+ ValidationResult.Builder builder = new
ValidationResult.Builder().subject(subject).input(input);
+ if (subject.startsWith(ICEBERG_SNAPSHOT_SUMMARY_PREFIX)) {
+ builder.valid(true);
+ } else {
+ builder.valid(false).explanation("Dynamic property key
must begin with '" + ICEBERG_SNAPSHOT_SUMMARY_PREFIX + "'");
+ }
+ return builder.build();
+ })
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true)
+ .build();
+ }
+
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
final List<ValidationResult> problems = new ArrayList<>();
@@ -322,8 +352,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
* Appends the pending data files to the given {@link Table}.
*
* @param context processor context
- * @param table table to append
- * @param result datafiles created by the {@link TaskWriter}
+ * @param table table to append
+ * @param result datafiles created by the {@link TaskWriter}
*/
void appendDataFiles(ProcessContext context, FlowFile flowFile, Table
table, WriteResult result) {
final int numberOfCommitRetries =
context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions(flowFile).asInteger();
@@ -334,6 +364,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
final AppendFiles appender = table.newAppend();
Arrays.stream(result.dataFiles()).forEach(appender::appendFile);
+ addSnapshotSummaryProperties(context, appender, flowFile);
+
Tasks.foreach(appender)
.exponentialBackoff(minimumCommitWaitTime,
maximumCommitWaitTime, maximumCommitDuration, 2.0)
.retry(numberOfCommitRetries)
@@ -341,6 +373,22 @@ public class PutIceberg extends AbstractIcebergProcessor {
.run(PendingUpdate::commit);
}
+ /**
+ * Adds the FlowFile's uuid and additional entries provided in Dynamic
properties to the snapshot summary.
+ *
+ * @param context processor context
+ * @param appender table appender to set the snapshot summaries
+ * @param flowFile the FlowFile to get the uuid from
+ */
+ private void addSnapshotSummaryProperties(ProcessContext context,
AppendFiles appender, FlowFile flowFile) {
+ appender.set(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID,
flowFile.getAttribute(CoreAttributes.UUID.key()));
+
+ for (Map.Entry<String, String> dynamicProperty :
getDynamicProperties(context, flowFile).entrySet()) {
+ String key =
dynamicProperty.getKey().substring(ICEBERG_SNAPSHOT_SUMMARY_PREFIX.length());
+ appender.set(key, dynamicProperty.getValue());
+ }
+ }
+
/**
* Determines the write file format from the requested value and the table
configuration.
*
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
index 06844910f4..71f471ec5a 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
@@ -54,5 +54,12 @@
The NiFi side retry logic is built on top of the Iceberg commit
retry logic which can be configured through table properties. See more: <a
href="https://iceberg.apache.org/docs/latest/configuration/#table-behavior-properties">Table
behavior properties</a>
</p>
+ <h3>Snapshot summary properties</h3>
+ <p>
+ The processor provides an option to add additional properties to
the snapshot summary using dynamic properties.
+ The additional property must have the 'snapshot-property.' prefix
in the dynamic property key but the actual entry will be inserted without it.
+ Each snapshot automatically gets the FlowFile's uuid in the
'nifi-flowfile-uuid' summary property.
+ </p>
+
</body>
</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
index 36f14cedb8..d6cf5bb91a 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
@@ -120,4 +120,28 @@ public class TestPutIcebergCustomValidation {
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.assertNotValid();
}
+
+ @Test
+ public void testInvalidSnapshotSummaryDynamicProperty() throws
InitializationException {
+ initRecordReader();
+
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
+
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
+ runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
+
+ runner.setProperty("invalid.dynamic.property", "test value");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testValidSnapshotSummaryDynamicProperty() throws
InitializationException {
+ initRecordReader();
+
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
+
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
+ runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
+
+ runner.setProperty("snapshot-property.valid-property", "test value");
+ runner.assertValid();
+ }
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index 05d140a829..0a0cd8ef96 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -43,7 +43,6 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -54,11 +53,13 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
+import static
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID;
import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
@@ -171,8 +172,8 @@ public class TestPutIcebergWithHiveCatalog {
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
String tableLocation = new URI(table.location()).getPath();
- Assertions.assertTrue(table.spec().isPartitioned());
- Assertions.assertEquals("4",
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ assertTrue(table.spec().isPartitioned());
+ assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList(
@@ -208,8 +209,8 @@ public class TestPutIcebergWithHiveCatalog {
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
String tableLocation = new URI(table.location()).getPath();
- Assertions.assertTrue(table.spec().isPartitioned());
- Assertions.assertEquals("4",
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ assertTrue(table.spec().isPartitioned());
+ assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList(
@@ -246,8 +247,8 @@ public class TestPutIcebergWithHiveCatalog {
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
String tableLocation = new URI(table.location()).getPath();
- Assertions.assertTrue(table.spec().isPartitioned());
- Assertions.assertEquals("4",
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ assertTrue(table.spec().isPartitioned());
+ assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(tableLocation, 4);
validatePartitionFolders(tableLocation, Arrays.asList(
@@ -267,7 +268,8 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}");
runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}");
runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");
- Map<String,String> attributes = new HashMap<>();
+ runner.setProperty("snapshot-property.additional-summary-property",
"test summary property");
+ Map<String, String> attributes = new HashMap<>();
attributes.put("catalog.name", CATALOG_NAME);
attributes.put("table.name", TABLE_NAME);
attributes.put("max.filesize", "536870912"); // 512 MB
@@ -286,11 +288,12 @@ public class TestPutIcebergWithHiveCatalog {
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
- Assertions.assertTrue(table.spec().isUnpartitioned());
- Assertions.assertEquals("4",
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
+ assertTrue(table.spec().isUnpartitioned());
+ assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
assertProvenanceEvents();
+ assertSnapshotSummaryProperties(table,
Collections.singletonMap("additional-summary-property", "test summary
property"));
}
private void assertProvenanceEvents() {
@@ -300,4 +303,14 @@ public class TestPutIcebergWithHiveCatalog {
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" +
TABLE_NAME));
}
+
+ private void assertSnapshotSummaryProperties(Table table, Map<String,
String> summaryProperties) {
+ Map<String, String> snapshotSummary =
table.currentSnapshot().summary();
+
+
assertTrue(snapshotSummary.containsKey(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID));
+
+ for (Map.Entry<String, String> entry : summaryProperties.entrySet()) {
+ assertEquals(snapshotSummary.get(entry.getKey()),
entry.getValue());
+ }
+ }
}