This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 9b15bbf6b9 NIFI-12054: PutIceberg should produce a provenance send
event
9b15bbf6b9 is described below
commit 9b15bbf6b9005f9a3e1aece0932cdf6e517086e5
Author: Mark Bathori <[email protected]>
AuthorDate: Thu Sep 14 14:01:36 2023 +0200
NIFI-12054: PutIceberg should produce a provenance send event
This closes #7690.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../org/apache/nifi/processors/iceberg/PutIceberg.java | 3 +++
.../iceberg/TestPutIcebergWithHiveCatalog.java | 16 ++++++++++++++++
2 files changed, 19 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index 360ea17f1b..02bd0b074f 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -234,6 +234,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
@Override
public void doOnTrigger(ProcessContext context, ProcessSession session,
FlowFile flowFile) throws ProcessException {
+ final long startNanos = System.nanoTime();
final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String fileFormat = context.getProperty(FILE_FORMAT).getValue();
final String maximumFileSize =
context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue();
@@ -281,6 +282,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
}
flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT,
String.valueOf(recordCount));
+ final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, table.location(),
transferMillis);
session.transfer(flowFile, REL_SUCCESS);
}
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index c672d90e8b..bc159ef470 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -32,6 +32,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
@@ -60,6 +62,8 @@ import static
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT
import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
@DisabledOnOs(WINDOWS)
@@ -174,6 +178,7 @@ public class TestPutIcebergWithHiveCatalog {
validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList(
"department_bucket=0", "department_bucket=1",
"department_bucket=2"));
+ assertProvenanceEvents();
}
@ParameterizedTest
@@ -211,6 +216,7 @@ public class TestPutIcebergWithHiveCatalog {
validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList(
"department=Finance", "department=Marketing",
"department=Sales"));
+ assertProvenanceEvents();
}
@ParameterizedTest
@@ -253,6 +259,7 @@ public class TestPutIcebergWithHiveCatalog {
"name=Joana/department=Sales/",
"name=John/department=Finance/"
));
+ assertProvenanceEvents();
}
@ParameterizedTest
@@ -287,5 +294,14 @@ public class TestPutIcebergWithHiveCatalog {
Assertions.assertEquals("4",
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
+ assertProvenanceEvents();
+ }
+
+ private void assertProvenanceEvents() {
+ final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+ assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" +
TABLE_NAME));
}
}