This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f7be2a6095 NIFI-13808: Record attributes and content for clone, upload 
and send. (#9486)
f7be2a6095 is described below

commit f7be2a6095053577953a09778fded3a65c4c35e0
Author: Bob Paulin <[email protected]>
AuthorDate: Thu Jan 2 15:15:52 2025 -0600

    NIFI-13808: Record attributes and content for clone, upload and send. 
(#9486)
---
 .../repository/StandardProcessSession.java         | 13 +++++++------
 .../repository/StandardProvenanceReporter.java     |  5 ++++-
 .../repository/StandardProvenanceReporterTest.java | 22 +++++++++++++++++++++-
 3 files changed, 32 insertions(+), 8 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index abb48dae26..5991516d1a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1007,8 +1007,9 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
                             // the representation of the FlowFile as it is 
committed, as this is the only way in which it really
                             // exists in our system -- all other 
representations are volatile representations that have not been
                             // exposed.
-                            final boolean isUpdateAttributes = 
rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() 
!= ProvenanceEventType.UPLOAD;
-                            return enrich(rawEvent, flowFileRecordMap, 
checkpoint.records, isUpdateAttributes, commitNanos);
+                            final boolean isUpdateAttributesAndContent = 
rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() 
!= ProvenanceEventType.UPLOAD
+                                    && rawEvent.getEventType() != 
ProvenanceEventType.CLONE;
+                            return enrich(rawEvent, flowFileRecordMap, 
checkpoint.records, isUpdateAttributesAndContent, commitNanos);
                         } else if (autoTermIterator != null && 
autoTermIterator.hasNext()) {
                             return enrich(autoTermIterator.next(), 
flowFileRecordMap, checkpoint.records, true, commitNanos);
                         }
@@ -1085,13 +1086,13 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
 
     private ProvenanceEventRecord enrich(
         final ProvenanceEventRecord rawEvent, final Map<String, 
FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord> 
records,
-        final boolean updateAttributes, final long commitNanos) {
+        final boolean updateAttributesAndContent, final long commitNanos) {
         final ProvenanceEventBuilder recordBuilder = 
context.createProvenanceEventBuilder().fromEvent(rawEvent);
         final FlowFileRecord eventFlowFile = 
flowFileRecordMap.get(rawEvent.getFlowFileUuid());
         if (eventFlowFile != null) {
             final StandardRepositoryRecord repoRecord = 
records.get(eventFlowFile.getId());
 
-            if (repoRecord.getCurrent() != null && 
repoRecord.getCurrentClaim() != null) {
+            if (updateAttributesAndContent && repoRecord.getCurrent() != null 
&& repoRecord.getCurrentClaim() != null) {
                 final ContentClaim currentClaim = repoRecord.getCurrentClaim();
                 final long currentOffset = repoRecord.getCurrentClaimOffset();
                 final long size = eventFlowFile.getSize();
@@ -1100,7 +1101,7 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
                 
recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), currentOffset + 
currentClaim.getOffset(), size);
             }
 
-            if (repoRecord.getOriginal() != null && 
repoRecord.getOriginalClaim() != null) {
+            if (updateAttributesAndContent && repoRecord.getOriginal() != null 
&& repoRecord.getOriginalClaim() != null) {
                 final ContentClaim originalClaim = 
repoRecord.getOriginalClaim();
                 final long originalOffset = 
repoRecord.getOriginal().getContentClaimOffset();
                 final long originalSize = repoRecord.getOriginal().getSize();
@@ -1114,7 +1115,7 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
                 
recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
             }
 
-            if (updateAttributes) {
+            if (updateAttributesAndContent) {
                 
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), 
repoRecord.getUpdatedAttributes());
             }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index 3ac141368b..832b9bba1b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -457,7 +457,10 @@ public class StandardProvenanceReporter implements 
InternalProvenanceReporter {
             final ProvenanceEventBuilder eventBuilder = build(parent, 
ProvenanceEventType.CLONE);
             eventBuilder.addChildFlowFile(child);
             eventBuilder.addParentFlowFile(parent);
-            events.add(eventBuilder.build());
+            final ProvenanceEventRecord eventRecord = eventBuilder.build();
+            final ProvenanceEventRecord enriched = eventEnricher == null ? 
eventRecord : eventEnricher.enrich(eventRecord, parent, System.nanoTime());
+
+            events.add(enriched);
         } catch (final Exception e) {
             logger.error("Failed to generate Provenance Event", e);
         }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
index 3bb9a53e8e..d636f3224e 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
@@ -28,6 +28,11 @@ import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 class StandardProvenanceReporterTest {
 
@@ -46,4 +51,19 @@ class StandardProvenanceReporterTest {
         assertNotNull(record);
         assertEquals("These are details", record.getDetails());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testEnrichEvents() {
+        final ProvenanceEventRepository mockRepo = 
Mockito.mock(ProvenanceEventRepository.class);
+        final ProvenanceEventEnricher enricher = 
Mockito.mock(ProvenanceEventEnricher.class);
+        final StandardProvenanceReporter reporter = new 
StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, enricher);
+        Mockito.when(mockRepo.eventBuilder()).thenReturn(new 
StandardProvenanceEventRecord.Builder());
+
+        final FlowFile flowFile = new 
StandardFlowFileRecord.Builder().id(10L).addAttribute("uuid", "10").build();
+        final FlowFile childFlowFile = new 
StandardFlowFileRecord.Builder().id(11L).addAttribute("uuid", "11").build();
+        reporter.send(flowFile, "test://noop");
+        reporter.upload(flowFile, 0, "test://noop");
+        reporter.clone(flowFile, childFlowFile);
+        verify(enricher, times(3)).enrich(any(), eq(flowFile), anyLong());
+    }
+}

Reply via email to