This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f7be2a6095 NIFI-13808: Record attributes and content for clone, upload
and send. (#9486)
f7be2a6095 is described below
commit f7be2a6095053577953a09778fded3a65c4c35e0
Author: Bob Paulin <[email protected]>
AuthorDate: Thu Jan 2 15:15:52 2025 -0600
NIFI-13808: Record attributes and content for clone, upload and send.
(#9486)
---
.../repository/StandardProcessSession.java | 13 +++++++------
.../repository/StandardProvenanceReporter.java | 5 ++++-
.../repository/StandardProvenanceReporterTest.java | 22 +++++++++++++++++++++-
3 files changed, 32 insertions(+), 8 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index abb48dae26..5991516d1a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1007,8 +1007,9 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
// the representation of the FlowFile as it is
committed, as this is the only way in which it really
// exists in our system -- all other
representations are volatile representations that have not been
// exposed.
- final boolean isUpdateAttributes =
rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType()
!= ProvenanceEventType.UPLOAD;
- return enrich(rawEvent, flowFileRecordMap,
checkpoint.records, isUpdateAttributes, commitNanos);
+ final boolean isUpdateAttributesAndContent =
rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType()
!= ProvenanceEventType.UPLOAD
+ && rawEvent.getEventType() !=
ProvenanceEventType.CLONE;
+ return enrich(rawEvent, flowFileRecordMap,
checkpoint.records, isUpdateAttributesAndContent, commitNanos);
} else if (autoTermIterator != null &&
autoTermIterator.hasNext()) {
return enrich(autoTermIterator.next(),
flowFileRecordMap, checkpoint.records, true, commitNanos);
}
@@ -1085,13 +1086,13 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
private ProvenanceEventRecord enrich(
final ProvenanceEventRecord rawEvent, final Map<String,
FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord>
records,
- final boolean updateAttributes, final long commitNanos) {
+ final boolean updateAttributesAndContent, final long commitNanos) {
final ProvenanceEventBuilder recordBuilder =
context.createProvenanceEventBuilder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile =
flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) {
final StandardRepositoryRecord repoRecord =
records.get(eventFlowFile.getId());
- if (repoRecord.getCurrent() != null &&
repoRecord.getCurrentClaim() != null) {
+ if (updateAttributesAndContent && repoRecord.getCurrent() != null
&& repoRecord.getCurrentClaim() != null) {
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
final long currentOffset = repoRecord.getCurrentClaimOffset();
final long size = eventFlowFile.getSize();
@@ -1100,7 +1101,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(),
resourceClaim.getSection(), resourceClaim.getId(), currentOffset +
currentClaim.getOffset(), size);
}
- if (repoRecord.getOriginal() != null &&
repoRecord.getOriginalClaim() != null) {
+ if (updateAttributesAndContent && repoRecord.getOriginal() != null
&& repoRecord.getOriginalClaim() != null) {
final ContentClaim originalClaim =
repoRecord.getOriginalClaim();
final long originalOffset =
repoRecord.getOriginal().getContentClaimOffset();
final long originalSize = repoRecord.getOriginal().getSize();
@@ -1114,7 +1115,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
}
- if (updateAttributes) {
+ if (updateAttributesAndContent) {
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(),
repoRecord.getUpdatedAttributes());
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index 3ac141368b..832b9bba1b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -457,7 +457,10 @@ public class StandardProvenanceReporter implements
InternalProvenanceReporter {
final ProvenanceEventBuilder eventBuilder = build(parent,
ProvenanceEventType.CLONE);
eventBuilder.addChildFlowFile(child);
eventBuilder.addParentFlowFile(parent);
- events.add(eventBuilder.build());
+ final ProvenanceEventRecord eventRecord = eventBuilder.build();
+ final ProvenanceEventRecord enriched = eventEnricher == null ?
eventRecord : eventEnricher.enrich(eventRecord, parent, System.nanoTime());
+
+ events.add(enriched);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event", e);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
index 3bb9a53e8e..d636f3224e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProvenanceReporterTest.java
@@ -28,6 +28,11 @@ import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
class StandardProvenanceReporterTest {
@@ -46,4 +51,19 @@ class StandardProvenanceReporterTest {
assertNotNull(record);
assertEquals("These are details", record.getDetails());
}
-}
\ No newline at end of file
+
+ @Test
+ public void testEnrichEvents() {
+ final ProvenanceEventRepository mockRepo =
Mockito.mock(ProvenanceEventRepository.class);
+ final ProvenanceEventEnricher enricher =
Mockito.mock(ProvenanceEventEnricher.class);
+ final StandardProvenanceReporter reporter = new
StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, enricher);
+ Mockito.when(mockRepo.eventBuilder()).thenReturn(new
StandardProvenanceEventRecord.Builder());
+
+ final FlowFile flowFile = new
StandardFlowFileRecord.Builder().id(10L).addAttribute("uuid", "10").build();
+ final FlowFile childFlowFile = new
StandardFlowFileRecord.Builder().id(11L).addAttribute("uuid", "11").build();
+ reporter.send(flowFile, "test://noop");
+ reporter.upload(flowFile, 0, "test://noop");
+ reporter.clone(flowFile, childFlowFile);
+ verify(enricher, times(3)).enrich(any(), eq(flowFile), anyLong());
+ }
+}