Repository: nifi Updated Branches: refs/heads/master b1f78d58a -> 473221368
NIFI-5420: Allow StandardProcessSession to calculate duration for provenance events This closes #2886. Signed-off-by: Mark Payne <marka...@hotmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/47322136 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/47322136 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/47322136 Branch: refs/heads/master Commit: 473221368c75d371cbabf7e68b0bfb62fbe56c00 Parents: b1f78d5 Author: Matthew Burgess <mattyb...@apache.org> Authored: Thu Jul 12 17:13:32 2018 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Fri Jul 27 09:13:50 2018 -0400 ---------------------------------------------------------------------- .../repository/ProvenanceEventEnricher.java | 3 ++- .../repository/StandardProcessSession.java | 28 +++++++++++--------- .../repository/StandardProvenanceReporter.java | 6 +++-- .../repository/TestStandardProcessSession.java | 23 ++++++++++++++++ .../repository/StandardRepositoryRecord.java | 5 ++++ 5 files changed, 49 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java index 323bfb0..f0f6bc4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java @@ -26,8 +26,9 @@ public interface ProvenanceEventEnricher { * * @param record record * @param flowFile flowfile + * @param commitNanos the time (in nanoseconds) when the associated session was committed * @return new event record */ - ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile); + ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile, long commitNanos); } http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index ea4969c..7255645 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -737,6 +737,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile); } + final long commitNanos = System.nanoTime(); final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents; final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() { final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator(); @@ -761,9 +762,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // the representation of the FlowFile as it is committed, as this is the only way in which it really // exists in our system -- all other representations are volatile representations that have not been // exposed. - return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND); + return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND, commitNanos); } else if (autoTermIterator != null && autoTermIterator.hasNext()) { - return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true); + return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos); } throw new NoSuchElementException(); @@ -796,7 +797,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile) { + public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile, final long commitNanos) { verifyTaskActive(); final StandardRepositoryRecord repoRecord = records.get(flowFile); @@ -829,11 +830,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes()); + if (rawEvent.getEventDuration() < 0) { + recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos())); + } return recordBuilder.build(); } private StandardProvenanceEventRecord enrich( - final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { + final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, + final boolean updateAttributes, final long commitNanos) { final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); if (eventFlowFile != null) { @@ -861,18 +866,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (originalQueue != null) { recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier()); } - } - if (updateAttributes) { - final FlowFileRecord flowFileRecord = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); - if (flowFileRecord != null) { - final StandardRepositoryRecord record = records.get(flowFileRecord); - if (record != null) { - recordBuilder.setAttributes(record.getOriginalAttributes(), record.getUpdatedAttributes()); - } + if (updateAttributes) { + recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes()); } - } + if (rawEvent.getEventDuration() < 0) { + recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos())); + } + } return recordBuilder.build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 7d14ee3..e956d50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -209,7 +209,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter { public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) { try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build(); - final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile); + // If the transmissionMillis field has been populated, use zero as the value of commitNanos (the call to System.nanoTime() is expensive but the value will be ignored). + final long commitNanos = transmissionMillis < 0 ? System.nanoTime() : 0L; + final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile, commitNanos); if (force) { repository.registerEvent(enriched); @@ -226,7 +228,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void send(final FlowFile flowFile, final String transitUri, final boolean force) { - send(flowFile, transitUri, -1L, true); + send(flowFile, transitUri, -1L, force); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 4059557..5667094 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -929,6 +929,29 @@ public class TestStandardProcessSession { } @Test + public void testProvenanceEventsHaveDurationFromSession() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord); + + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); + session.getProvenanceReporter().fork(orig, Collections.singletonList(newFlowFile), 0L); + session.getProvenanceReporter().fetch(newFlowFile, "nowhere://"); + session.getProvenanceReporter().send(newFlowFile, "nowhere://"); + session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 100000); + assertNotNull(events); + assertEquals(3, events.size()); // FETCH, SEND, and FORK + events.forEach((event) -> assertTrue(event.getEventDuration() > -1)); + } + + @Test public void testUuidAttributeCannotBeUpdated() { String originalUuid = "11111111-1111-1111-1111-111111111111"; final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java index 8aa1caf..6f045e5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java @@ -38,6 +38,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { private final Map<String, String> updatedAttributes = new HashMap<>(); private final Map<String, String> originalAttributes; private List<ContentClaim> transientClaims; + private final long startNanos = System.nanoTime(); /** * Creates a new record which has no original claim or flow file - it is entirely new @@ -218,4 +219,8 @@ public class StandardRepositoryRecord implements RepositoryRecord { } transientClaims.add(claim); } + + public long getStartNanos() { + return startNanos; + } }