Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 28abc9acb -> 739baa2e5


NIFI-37: Ensure that prov events that are emitted are emitted against FlowFiles 
that are known to that session.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/739baa2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/739baa2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/739baa2e

Branch: refs/heads/develop
Commit: 739baa2e5711b0677a451c4acadb10e232d5bf2b
Parents: 28abc9a
Author: Mark Payne <[email protected]>
Authored: Tue Jun 16 20:32:22 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Jun 17 07:56:49 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/util/MockProcessSession.java    |  40 +-
 .../nifi/util/MockProvenanceReporter.java       | 365 ++++++++++++++++---
 .../apache/nifi/util/MockSessionFactory.java    |   8 +-
 .../apache/nifi/util/SharedSessionState.java    |  26 +-
 .../nifi/util/StandardProcessorTestRunner.java  |  34 +-
 .../java/org/apache/nifi/util/TestRunner.java   |  15 +
 .../repository/StandardProcessSession.java      |  27 +-
 .../repository/StandardProvenanceReporter.java  |  32 +-
 .../TestStandardProvenanceReporter.java         |   7 +-
 .../nifi/processors/kafka/TestPutKafka.java     |  52 +--
 .../prioritizer/NewestFirstPrioritizerTest.java |  16 +-
 .../prioritizer/OldestFirstPrioritizerTest.java |  16 +-
 .../PriorityAttributePrioritizerTest.java       |  27 +-
 13 files changed, 499 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index e9bb778..13198dc 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -40,11 +40,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
-import org.junit.Assert;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
@@ -54,6 +54,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.provenance.ProvenanceReporter;
+import org.junit.Assert;
 
 public class MockProcessSession implements ProcessSession {
 
@@ -65,14 +66,16 @@ public class MockProcessSession implements ProcessSession {
     private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
     private final SharedSessionState sharedState;
     private final Map<String, Long> counterMap = new HashMap<>();
+    private final ProvenanceReporter provenanceReporter;
 
     private boolean committed = false;
     private boolean rolledback = false;
     private int removedCount = 0;
 
-    public MockProcessSession(final SharedSessionState sharedState) {
+    public MockProcessSession(final SharedSessionState sharedState, final 
Processor processor) {
         this.sharedState = sharedState;
         this.processorQueue = sharedState.getFlowFileQueue();
+        provenanceReporter = new MockProvenanceReporter(this, sharedState, 
processor.getIdentifier(), processor.getClass().getSimpleName());
     }
 
     @Override
@@ -194,7 +197,7 @@ public class MockProcessSession implements ProcessSession {
 
         try {
             out.write(mock.getData());
-        } catch (IOException e) {
+        } catch (final IOException e) {
             throw new FlowFileAccessException(e.toString(), e);
         }
     }
@@ -409,7 +412,7 @@ public class MockProcessSession implements ProcessSession {
         final ByteArrayInputStream bais = new 
ByteArrayInputStream(mock.getData());
         try {
             callback.process(bais);
-        } catch (IOException e) {
+        } catch (final IOException e) {
             throw new ProcessException(e.toString(), e);
         }
     }
@@ -766,7 +769,7 @@ public class MockProcessSession implements ProcessSession {
         if (source == null || destination == null || source == destination) {
             return destination; //don't need to inherit from ourselves
         }
-        FlowFile updated = putAllAttributes(destination, 
source.getAttributes());
+        final FlowFile updated = putAllAttributes(destination, 
source.getAttributes());
         getProvenanceReporter().fork(source, 
Collections.singletonList(updated));
         return updated;
     }
@@ -803,7 +806,7 @@ public class MockProcessSession implements ProcessSession {
             }
         }
 
-        FlowFile updated = putAllAttributes(destination, 
intersectAttributes(sources));
+        final FlowFile updated = putAllAttributes(destination, 
intersectAttributes(sources));
         getProvenanceReporter().join(sources, updated);
         return updated;
     }
@@ -982,7 +985,7 @@ public class MockProcessSession implements ProcessSession {
 
     @Override
     public ProvenanceReporter getProvenanceReporter() {
-        return sharedState.getProvenanceReporter();
+        return provenanceReporter;
     }
 
     @Override
@@ -997,4 +1000,27 @@ public class MockProcessSession implements ProcessSession 
{
         validateState(flowFile);
         return flowFile.getData();
     }
+
+    /**
+     * Checks if a FlowFile is known in this session.
+     *
+     * @param flowFile
+     *            the FlowFile to check
+     * @return <code>true</code> if the FlowFile is known in this session,
+     *         <code>false</code> otherwise.
+     */
+    boolean isFlowFileKnown(final FlowFile flowFile) {
+        final FlowFile curFlowFile = currentVersions.get(flowFile.getId());
+        if (curFlowFile == null) {
+            return false;
+        }
+
+        final String curUuid = 
curFlowFile.getAttribute(CoreAttributes.UUID.key());
+        final String providedUuid = 
curFlowFile.getAttribute(CoreAttributes.UUID.key());
+        if (!curUuid.equals(providedUuid)) {
+            return false;
+        }
+
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
index 097eafd..8c9a320 100644
--- 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
+++ 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
@@ -17,186 +17,437 @@
 package org.apache.nifi.util;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileHandlingException;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MockProvenanceReporter implements ProvenanceReporter {
+    private static final Logger logger = 
LoggerFactory.getLogger(MockProvenanceReporter.class);
+    private final MockProcessSession session;
+    private final String processorId;
+    private final String processorType;
+    private final SharedSessionState sharedSessionState;
+    private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
 
-    @Override
-    public void receive(FlowFile flowFile, String sourceSystemUri) {
-
+    public MockProvenanceReporter(final MockProcessSession session, final 
SharedSessionState sharedState, final String processorId, final String 
processorType) {
+        this.session = session;
+        this.sharedSessionState = sharedState;
+        this.processorId = processorId;
+        this.processorType = processorType;
     }
 
-    @Override
-    public void send(FlowFile flowFile, String destinationSystemUri) {
+    private void verifyFlowFileKnown(final FlowFile flowFile) {
+        if (session != null && !session.isFlowFileKnown(flowFile)) {
+            throw new FlowFileHandlingException(flowFile + " is not known to " 
+ session);
+        }
+    }
 
+    Set<ProvenanceEventRecord> getEvents() {
+        return Collections.unmodifiableSet(events);
     }
 
-    @Override
-    public void send(FlowFile flowFile, String destinationSystemUri, boolean 
force) {
+    /**
+     * Removes the given event from the reporter
+     *
+     * @param event
+     *            event
+     */
+    void remove(final ProvenanceEventRecord event) {
+        events.remove(event);
+    }
 
+    void clear() {
+        events.clear();
     }
 
-    @Override
-    public void receive(FlowFile flowFile, String sourceSystemUri, long 
transmissionMillis) {
+    /**
+     * Generates a Fork event for the given child and parents but does not
+     * register the event. This is useful so that a ProcessSession has the
+     * ability to de-dupe events, since one or more events may be created by 
the
+     * session itself, as well as by the Processor
+     *
+     * @param parents
+     *            parents
+     * @param child
+     *            child
+     * @return record
+     */
+    ProvenanceEventRecord generateJoinEvent(final Collection<FlowFile> 
parents, final FlowFile child) {
+        final ProvenanceEventBuilder eventBuilder = build(child, 
ProvenanceEventType.JOIN);
+        eventBuilder.addChildFlowFile(child);
 
-    }
+        for (final FlowFile parent : parents) {
+            eventBuilder.addParentFlowFile(parent);
+        }
 
-    @Override
-    public void receive(FlowFile flowFile, String sourceSystemUri, String 
sourceSystemFlowFileIdentifier) {
+        return eventBuilder.build();
+    }
 
+    ProvenanceEventRecord generateDropEvent(final FlowFile flowFile, final 
String details) {
+        return build(flowFile, 
ProvenanceEventType.DROP).setDetails(details).build();
     }
 
     @Override
-    public void receive(FlowFile flowFile, String sourceSystemUri, String 
sourceSystemFlowFileIdentifier, long transmissionMillis) {
-
+    public void receive(final FlowFile flowFile, final String transitUri) {
+        receive(flowFile, transitUri, -1L);
     }
 
     @Override
-    public void send(FlowFile flowFile, String destinationSystemUri, long 
transmissionMillis) {
-
+    public void receive(FlowFile flowFile, String transitUri, String 
sourceSystemFlowFileIdentifier) {
+        receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L);
     }
 
     @Override
-    public void send(FlowFile flowFile, String destinationSystemUri, long 
transmissionMillis, boolean force) {
-
+    public void receive(final FlowFile flowFile, final String transitUri, 
final long transmissionMillis) {
+        receive(flowFile, transitUri, null, transmissionMillis);
     }
 
     @Override
-    public void associate(FlowFile flowFile, String 
alternateIdentifierNamespace, String alternateIdentifier) {
-
+    public void receive(final FlowFile flowFile, final String transitUri, 
final String sourceSystemFlowFileIdentifier, final long transmissionMillis) {
+        receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, 
transmissionMillis);
     }
 
     @Override
-    public void fork(FlowFile parent, Collection<FlowFile> children) {
+    public void receive(final FlowFile flowFile, final String transitUri, 
final String sourceSystemFlowFileIdentifier, final String details, final long 
transmissionMillis) {
+        verifyFlowFileKnown(flowFile);
 
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.RECEIVE)
+                
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void fork(FlowFile parent, Collection<FlowFile> children, long 
forkDuration) {
-
+    public void send(final FlowFile flowFile, final String transitUri, final 
long transmissionMillis) {
+        send(flowFile, transitUri, transmissionMillis, true);
     }
 
     @Override
-    public void fork(FlowFile parent, Collection<FlowFile> children, String 
details) {
-
+    public void send(final FlowFile flowFile, final String transitUri) {
+        send(flowFile, transitUri, null, -1L, true);
     }
 
     @Override
-    public void fork(FlowFile parent, java.util.Collection<FlowFile> children, 
String details, long forkDuration) {
-
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details) {
+        send(flowFile, transitUri, details, -1L, true);
     }
 
     @Override
-    public void join(Collection<FlowFile> parents, FlowFile child) {
-
+    public void send(final FlowFile flowFile, final String transitUri, final 
long transmissionMillis, final boolean force) {
+        send(flowFile, transitUri, null, transmissionMillis, force);
     }
 
     @Override
-    public void join(Collection<FlowFile> parents, FlowFile child, long 
joinDuration) {
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details, final boolean force) {
+        send(flowFile, transitUri, details, -1L, force);
+    }
 
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details, final long transmissionMillis) {
+        send(flowFile, transitUri, details, transmissionMillis, true);
     }
 
     @Override
-    public void join(Collection<FlowFile> parents, FlowFile child, String 
details) {
+    public void send(final FlowFile flowFile, final String transitUri, final 
String details, final long transmissionMillis, final boolean force) {
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
+            if (force) {
+                
sharedSessionState.addProvenanceEvents(Collections.singleton(record));
+            } else {
+                events.add(record);
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
 
+    @Override
+    public void send(final FlowFile flowFile, final String transitUri, final 
boolean force) {
+        send(flowFile, transitUri, -1L, true);
     }
 
     @Override
-    public void join(java.util.Collection<FlowFile> parents, FlowFile child, 
String details, long joinDuration) {
+    public void associate(final FlowFile flowFile, final String 
alternateIdentifierNamespace, final String alternateIdentifier) {
+        try {
+            String trimmedNamespace = alternateIdentifierNamespace.trim();
+            if (trimmedNamespace.endsWith(":")) {
+                trimmedNamespace = trimmedNamespace.substring(0, 
trimmedNamespace.length() - 1);
+            }
+
+            String trimmedIdentifier = alternateIdentifier.trim();
+            if (trimmedIdentifier.startsWith(":")) {
+                if (trimmedIdentifier.length() == 1) {
+                    throw new IllegalArgumentException("Illegal 
alternateIdentifier: " + alternateIdentifier);
+                }
+                trimmedIdentifier = trimmedIdentifier.substring(1);
+            }
 
+            final String alternateIdentifierUri = trimmedNamespace + ":" + 
trimmedIdentifier;
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ADDINFO).setAlternateIdentifierUri(alternateIdentifierUri).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
-    @Override
-    public void clone(FlowFile parent, FlowFile child) {
+    ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) {
+        try {
+            final ProvenanceEventBuilder builder = build(flowFile, 
ProvenanceEventType.DROP);
+            if (reason != null) {
+                builder.setDetails("Discard reason: " + reason);
+            }
+            final ProvenanceEventRecord record = builder.build();
+            events.add(record);
+            return record;
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            return null;
+        }
+    }
 
+    void expire(final FlowFile flowFile, final String details) {
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.EXPIRE).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void modifyContent(FlowFile flowFile) {
-
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children) {
+        fork(parent, children, null, -1L);
     }
 
     @Override
-    public void modifyContent(FlowFile flowFile, String details) {
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children, final long forkDuration) {
+        fork(parent, children, null, forkDuration);
+    }
 
+    @Override
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children, final String details) {
+        fork(parent, children, details, -1L);
     }
 
     @Override
-    public void modifyContent(FlowFile flowFile, long processingMillis) {
+    public void fork(final FlowFile parent, final Collection<FlowFile> 
children, final String details, final long forkDuration) {
+        verifyFlowFileKnown(parent);
+
+        try {
+            final ProvenanceEventBuilder eventBuilder = build(parent, 
ProvenanceEventType.FORK);
+            eventBuilder.addParentFlowFile(parent);
+            for (final FlowFile child : children) {
+                eventBuilder.addChildFlowFile(child);
+            }
 
+            if (forkDuration > -1L) {
+                eventBuilder.setEventDuration(forkDuration);
+            }
+
+            if (details != null) {
+                eventBuilder.setDetails(details);
+            }
+
+            events.add(eventBuilder.build());
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void modifyContent(FlowFile flowFile, String details, long 
processingMillis) {
-
+    public void join(final Collection<FlowFile> parents, final FlowFile child) 
{
+        join(parents, child, null, -1L);
     }
 
     @Override
-    public void modifyAttributes(FlowFile flowFile) {
+    public void join(final Collection<FlowFile> parents, final FlowFile child, 
final long joinDuration) {
+        join(parents, child, null, joinDuration);
+    }
 
+    @Override
+    public void join(final Collection<FlowFile> parents, final FlowFile child, 
final String details) {
+        join(parents, child, details, -1L);
     }
 
     @Override
-    public void modifyAttributes(FlowFile flowFile, String details) {
+    public void join(final Collection<FlowFile> parents, final FlowFile child, 
final String details, final long joinDuration) {
+        verifyFlowFileKnown(child);
+
+        try {
+            final ProvenanceEventBuilder eventBuilder = build(child, 
ProvenanceEventType.JOIN);
+            eventBuilder.addChildFlowFile(child);
+            eventBuilder.setDetails(details);
 
+            for (final FlowFile parent : parents) {
+                eventBuilder.addParentFlowFile(parent);
+            }
+
+            events.add(eventBuilder.build());
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void route(FlowFile flowFile, Relationship relationship) {
+    public void clone(final FlowFile parent, final FlowFile child) {
+        verifyFlowFileKnown(child);
 
+        try {
+            final ProvenanceEventBuilder eventBuilder = build(parent, 
ProvenanceEventType.CLONE);
+            eventBuilder.addChildFlowFile(child);
+            eventBuilder.addParentFlowFile(parent);
+            events.add(eventBuilder.build());
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void route(FlowFile flowFile, Relationship relationship, String 
details) {
-
+    public void modifyContent(final FlowFile flowFile) {
+        modifyContent(flowFile, null, -1L);
     }
 
     @Override
-    public void route(FlowFile flowFile, Relationship relationship, long 
processingDuration) {
+    public void modifyContent(final FlowFile flowFile, final String details) {
+        modifyContent(flowFile, details, -1L);
+    }
 
+    @Override
+    public void modifyContent(final FlowFile flowFile, final long 
processingMillis) {
+        modifyContent(flowFile, null, processingMillis);
     }
 
     @Override
-    public void route(FlowFile flowFile, Relationship relationship, String 
details, long processingDuration) {
+    public void modifyContent(final FlowFile flowFile, final String details, 
final long processingMillis) {
+        verifyFlowFileKnown(flowFile);
 
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void create(FlowFile flowFile) {
-
+    public void modifyAttributes(final FlowFile flowFile) {
+        modifyAttributes(flowFile, null);
     }
 
     @Override
-    public void create(FlowFile flowFile, String details) {
+    public void modifyAttributes(final FlowFile flowFile, final String 
details) {
+        verifyFlowFileKnown(flowFile);
 
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void receive(FlowFile flowFile, String sourceSystemUri, String 
sourceSystemFlowFileIdentifier, String details, long transmissionMillis) {
-
+    public void route(final FlowFile flowFile, final Relationship 
relationship) {
+        route(flowFile, relationship, null);
     }
 
     @Override
-    public void send(FlowFile flowFile, String destinationSystemUri, String 
details) {
+    public void route(final FlowFile flowFile, final Relationship 
relationship, final long processingDuration) {
+        route(flowFile, relationship, null, processingDuration);
+    }
 
+    @Override
+    public void route(final FlowFile flowFile, final Relationship 
relationship, final String details) {
+        route(flowFile, relationship, details, -1L);
     }
 
     @Override
-    public void send(FlowFile flowFile, String destinationSystemUri, String 
details, long transmissionMillis) {
+    public void route(final FlowFile flowFile, final Relationship 
relationship, final String details, final long processingDuration) {
+        verifyFlowFileKnown(flowFile);
 
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
     }
 
     @Override
-    public void send(FlowFile flowFile, String destinationSystemUri, String 
details, boolean force) {
-
+    public void create(final FlowFile flowFile) {
+        create(flowFile, null);
     }
 
     @Override
-    public void send(FlowFile flowFile, String destinationSystemUri, String 
details, long transmissionMillis, boolean force) {
+    public void create(final FlowFile flowFile, final String details) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.CREATE).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
 
+    ProvenanceEventBuilder build(final FlowFile flowFile, final 
ProvenanceEventType eventType) {
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventType(eventType);
+        builder.fromFlowFile(flowFile);
+        builder.setLineageStartDate(flowFile.getLineageStartDate());
+        builder.setComponentId(processorId);
+        builder.setComponentType(processorType);
+        return builder;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index 0f4bbc6..49b8796 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -22,20 +22,22 @@ import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
 
 public class MockSessionFactory implements ProcessSessionFactory {
 
+    private final Processor processor;
     private final SharedSessionState sharedState;
-
     private final Set<MockProcessSession> createdSessions = new 
CopyOnWriteArraySet<>();
 
-    MockSessionFactory(final SharedSessionState sharedState) {
+    MockSessionFactory(final SharedSessionState sharedState, final Processor 
processor) {
         this.sharedState = sharedState;
+        this.processor = processor;
     }
 
     @Override
     public ProcessSession createSession() {
-        final MockProcessSession session = new MockProcessSession(sharedState);
+        final MockProcessSession session = new MockProcessSession(sharedState, 
processor);
         createdSessions.add(session);
         return session;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
index 65d79a6..994735b 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
@@ -16,11 +16,18 @@
  */
 package org.apache.nifi.util;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
 
 public class SharedSessionState {
@@ -31,14 +38,27 @@ public class SharedSessionState {
     private final Processor processor;
     private final AtomicLong flowFileIdGenerator;
     private final ConcurrentMap<String, AtomicLong> counterMap = new 
ConcurrentHashMap<>();
+    private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
 
     public SharedSessionState(final Processor processor, final AtomicLong 
flowFileIdGenerator) {
         flowFileQueue = new MockFlowFileQueue();
-        provenanceReporter = new MockProvenanceReporter();
+        provenanceReporter = new MockProvenanceReporter(null, this, 
UUID.randomUUID().toString(), "N/A");
         this.flowFileIdGenerator = flowFileIdGenerator;
         this.processor = processor;
     }
 
+    void addProvenanceEvents(final Collection<ProvenanceEventRecord> events) {
+        this.events.addAll(events);
+    }
+
+    void clearProvenanceEvents() {
+        this.events.clear();
+    }
+
+    public List<ProvenanceEventRecord> getProvenanceEvents() {
+        return new ArrayList<>(this.events);
+    }
+
     public MockFlowFileQueue getFlowFileQueue() {
         return flowFileQueue;
     }
@@ -55,7 +75,7 @@ public class SharedSessionState {
         AtomicLong counter = counterMap.get(name);
         if (counter == null) {
             counter = new AtomicLong(0L);
-            AtomicLong existingCounter = counterMap.putIfAbsent(name, counter);
+            final AtomicLong existingCounter = counterMap.putIfAbsent(name, 
counter);
             if (existingCounter != null) {
                 counter = existingCounter;
             }
@@ -66,6 +86,6 @@ public class SharedSessionState {
 
     public Long getCounterValue(final String name) {
         final AtomicLong counterValue = counterMap.get(name);
-        return (counterValue == null) ? null : counterValue.get();
+        return counterValue == null ? null : counterValue.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7048cfe..655f2df 100644
--- 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -64,11 +64,10 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.reporting.InitializationException;
 import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class StandardProcessorTestRunner implements TestRunner {
 
@@ -83,7 +82,6 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     private int numThreads = 1;
     private final AtomicInteger invocations = new AtomicInteger(0);
 
-    private static final Logger logger = 
LoggerFactory.getLogger(StandardProcessorTestRunner.class);
     private static final Set<Class<? extends Annotation>> 
deprecatedTypeAnnotations = new HashSet<>();
     private static final Set<Class<? extends Annotation>> 
deprecatedMethodAnnotations = new HashSet<>();
 
@@ -99,7 +97,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         this.idGenerator = new AtomicLong(0L);
         this.sharedState = new SharedSessionState(processor, idGenerator);
         this.flowFileQueue = sharedState.getFlowFileQueue();
-        this.sessionFactory = new MockSessionFactory(sharedState);
+        this.sessionFactory = new MockSessionFactory(sharedState, processor);
         this.context = new MockProcessContext(processor);
 
         detectDeprecatedAnnotations(processor);
@@ -109,7 +107,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
         try {
             ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
processor);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             Assert.fail("Could not invoke methods annotated with @OnAdded 
annotation due to: " + e);
         }
 
@@ -194,7 +192,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
             if (initialize) {
                 try {
                     
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, 
context);
-                } catch (Exception e) {
+                } catch (final Exception e) {
                     e.printStackTrace();
                     Assert.fail("Could not invoke methods annotated with 
@OnScheduled annotation due to: " + e);
                 }
@@ -223,7 +221,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
                         unscheduledRun = true;
                         try {
                             
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, 
context);
-                        } catch (Exception e) {
+                        } catch (final Exception e) {
                             Assert.fail("Could not invoke methods annotated 
with @OnUnscheduled annotation due to: " + e);
                         }
                     }
@@ -234,7 +232,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
             if (!unscheduledRun) {
                 try {
                     
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, 
context);
-                } catch (Exception e) {
+                } catch (final Exception e) {
                     Assert.fail("Could not invoke methods annotated with 
@OnUnscheduled annotation due to: " + e);
                 }
             }
@@ -242,7 +240,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
             if (stopOnFinish) {
                 try {
                     
ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor);
-                } catch (Exception e) {
+                } catch (final Exception e) {
                     Assert.fail("Could not invoke methods annotated with 
@OnStopped annotation due to: " + e);
                 }
             }
@@ -255,7 +253,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     public void shutdown() {
         try {
             ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, 
processor);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             Assert.fail("Could not invoke methods annotated with @OnShutdown 
annotation due to: " + e);
         }
     }
@@ -388,7 +386,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void enqueue(final InputStream data, final Map<String, String> 
attributes) {
-        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator));
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator), processor);
         MockFlowFile flowFile = session.create();
         flowFile = session.importFrom(data, flowFile);
         flowFile = session.putAllAttributes(flowFile, attributes);
@@ -423,7 +421,11 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         return flowFiles;
     }
 
+    /**
+     * @deprecated The ProvenanceReporter should not be accessed through the 
test runner, as it does not expose the events that were emitted.
+     */
     @Override
+    @Deprecated
     public ProvenanceReporter getProvenanceReporter() {
         return sharedState.getProvenanceReporter();
     }
@@ -703,4 +705,14 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         return context.removeProperty(descriptor);
     }
 
+    @Override
+    public List<ProvenanceEventRecord> getProvenanceEvents() {
+        return sharedState.getProvenanceEvents();
+    }
+
+    @Override
+    public void clearProvenanceEvents() {
+        sharedState.clearProvenanceEvents();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index c4fec35..a599e5b 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -32,6 +32,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.reporting.InitializationException;
 
@@ -702,4 +703,18 @@ public interface TestRunner {
      * @return true if removed
      */
     boolean removeProperty(PropertyDescriptor descriptor);
+
+    /**
+     * Returns a {@link List} of all {@link ProvenanceEventRecord}s that were
+     * emitted by the Processor
+     *
+     * @return a List of all Provenance Events that were emitted by the
+     *         Processor
+     */
+    List<ProvenanceEventRecord> getProvenanceEvents();
+
+    /**
+     * Clears the Provenance Events that have been emitted by the Processor
+     */
+    void clearProvenanceEvents();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a048d21..4ee8c06 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -178,7 +178,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             throw new AssertionError("Connectable type is " + 
connectable.getConnectableType());
         }
 
-        this.provenanceReporter = new 
StandardProvenanceReporter(connectable.getIdentifier(), componentType, 
context.getProvenanceRepository(), this);
+        this.provenanceReporter = new StandardProvenanceReporter(this, 
connectable.getIdentifier(), componentType,
+            context.getProvenanceRepository(), this);
         this.sessionId = idGenerator.getAndIncrement();
         this.connectableDescription = description;
 
@@ -324,7 +325,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 }
                 final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
                 final Connectable connectable = context.getConnectable();
-                final Object terminator = (connectable instanceof 
ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable;
+                final Object terminator = connectable instanceof ProcessorNode 
? ((ProcessorNode) connectable).getProcessor() : connectable;
                 LOG.info("{} terminated by {}; life of FlowFile = {} ms", new 
Object[]{flowFile, terminator, flowFileLife});
             } else if (record.isWorking() && record.getWorkingClaim() != 
record.getOriginalClaim()) {
                 //records which have been updated - remove original if exists
@@ -651,7 +652,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 return new Iterator<ProvenanceEventRecord>() {
                     @Override
                     public boolean hasNext() {
-                        return recordsToSubmitIterator.hasNext() || 
(autoTermIterator != null && autoTermIterator.hasNext());
+                        return recordsToSubmitIterator.hasNext() || 
autoTermIterator != null && autoTermIterator.hasNext();
                     }
 
                     @Override
@@ -1056,8 +1057,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private void formatNanos(final long nanos, final StringBuilder sb) {
-        final long seconds = (nanos > 1000000000L) ? (nanos / 1000000000L) : 
0L;
-        long millis = (nanos > 1000000L) ? (nanos / 1000000L) : 0L;;
+        final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
+        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;;
         final long nanosLeft = nanos % 1000000L;
 
         if (seconds > 0) {
@@ -1609,7 +1610,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             processorType = connectable.getClass().getSimpleName();
         }
 
-        final StandardProvenanceReporter expiredReporter = new 
StandardProvenanceReporter(connectable.getIdentifier(),
+        final StandardProvenanceReporter expiredReporter = new 
StandardProvenanceReporter(this, connectable.getIdentifier(),
             processorType, context.getProvenanceRepository(), this);
 
         final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
@@ -1623,7 +1624,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             removeContent(flowFile.getContentClaim());
 
             final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
-            final Object terminator = (connectable instanceof ProcessorNode) ? 
((ProcessorNode) connectable).getProcessor() : connectable;
+            final Object terminator = connectable instanceof ProcessorNode ? 
((ProcessorNode) connectable).getProcessor() : connectable;
             LOG.info("{} terminated by {} due to FlowFile expiration; life of 
FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
         }
 
@@ -1828,7 +1829,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                     readCount += copied;
 
                     // don't add demarcator after the last claim
-                    if (useDemarcator && (++objectIndex < numSources)) {
+                    if (useDemarcator && ++objectIndex < numSources) {
                         out.write(demarcator);
                         writtenCount += demarcator.length;
                     }
@@ -2488,6 +2489,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
     }
 
+    /**
+     * Checks if a FlowFile is known in this session.
+     *
+     * @param flowFile the FlowFile to check
+     * @return <code>true</code> if the FlowFile is known in this session, 
<code>false</code> otherwise.
+     */
+    boolean isFlowFileKnown(final FlowFile flowFile) {
+        return records.containsKey(flowFile);
+    }
+
     @Override
     public FlowFile create(final FlowFile parent) {
         final Map<String, String> newAttributes = new HashMap<>(3);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index a55fb25..5194fef 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -23,12 +23,12 @@ import java.util.Set;
 
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileHandlingException;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,8 +41,11 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
     private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
     private final ProvenanceEventRepository repository;
     private final ProvenanceEventEnricher eventEnricher;
+    private final StandardProcessSession session;
 
-    public StandardProvenanceReporter(final String processorId, final String 
processorType, final ProvenanceEventRepository repository, final 
ProvenanceEventEnricher enricher) {
+    public StandardProvenanceReporter(final StandardProcessSession session, 
final String processorId, final String processorType,
+        final ProvenanceEventRepository repository, final 
ProvenanceEventEnricher enricher) {
+        this.session = session;
         this.processorId = processorId;
         this.processorType = processorType;
         this.repository = repository;
@@ -89,6 +92,12 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
         return build(flowFile, 
ProvenanceEventType.DROP).setDetails(details).build();
     }
 
+    private void verifyFlowFileKnown(final FlowFile flowFile) {
+        if (session != null && !session.isFlowFileKnown(flowFile)) {
+            throw new FlowFileHandlingException(flowFile + " is not known to " 
+ session);
+        }
+    }
+
     @Override
     public void receive(final FlowFile flowFile, final String transitUri) {
         receive(flowFile, transitUri, -1L);
@@ -111,6 +120,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void receive(final FlowFile flowFile, final String transitUri, 
final String sourceSystemFlowFileIdentifier, final String details, final long 
transmissionMillis) {
+        verifyFlowFileKnown(flowFile);
+
         try {
             final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.RECEIVE)
                     
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
@@ -157,8 +168,7 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
     public void send(final FlowFile flowFile, final String transitUri, final 
String details, final long transmissionMillis, final boolean force) {
         try {
             final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
-
-            final ProvenanceEventRecord enriched = 
eventEnricher.enrich(record, flowFile);
+            final ProvenanceEventRecord enriched = eventEnricher == null ? 
record : eventEnricher.enrich(record, flowFile);
 
             if (force) {
                 repository.registerEvent(enriched);
@@ -252,6 +262,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void fork(final FlowFile parent, final Collection<FlowFile> 
children, final String details, final long forkDuration) {
+        verifyFlowFileKnown(parent);
+
         try {
             final ProvenanceEventBuilder eventBuilder = build(parent, 
ProvenanceEventType.FORK);
             eventBuilder.addParentFlowFile(parent);
@@ -293,6 +305,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void join(final Collection<FlowFile> parents, final FlowFile child, 
final String details, final long joinDuration) {
+        verifyFlowFileKnown(child);
+
         try {
             final ProvenanceEventBuilder eventBuilder = build(child, 
ProvenanceEventType.JOIN);
             eventBuilder.addChildFlowFile(child);
@@ -313,6 +327,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void clone(final FlowFile parent, final FlowFile child) {
+        verifyFlowFileKnown(child);
+
         try {
             final ProvenanceEventBuilder eventBuilder = build(parent, 
ProvenanceEventType.CLONE);
             eventBuilder.addChildFlowFile(child);
@@ -343,6 +359,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void modifyContent(final FlowFile flowFile, final String details, 
final long processingMillis) {
+        verifyFlowFileKnown(flowFile);
+
         try {
             final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build();
             events.add(record);
@@ -361,6 +379,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void modifyAttributes(final FlowFile flowFile, final String 
details) {
+        verifyFlowFileKnown(flowFile);
+
         try {
             final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build();
             events.add(record);
@@ -389,6 +409,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void route(final FlowFile flowFile, final Relationship 
relationship, final String details, final long processingDuration) {
+        verifyFlowFileKnown(flowFile);
+
         try {
             final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build();
             events.add(record);
@@ -407,6 +429,8 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
 
     @Override
     public void create(final FlowFile flowFile, final String details) {
+        verifyFlowFileKnown(flowFile);
+
         try {
             final ProvenanceEventRecord record = build(flowFile, 
ProvenanceEventType.CREATE).setDetails(details).build();
             events.add(record);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java
index 07480de..c079163 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.StandardProvenanceReporter;
-import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
@@ -27,7 +25,6 @@ import java.util.Set;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
-
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -38,7 +35,7 @@ public class TestStandardProvenanceReporter {
     @Ignore
     public void testDuplicatesIgnored() {
         final ProvenanceEventRepository mockRepo = 
Mockito.mock(ProvenanceEventRepository.class);
-        final StandardProvenanceReporter reporter = new 
StandardProvenanceReporter("1234", "TestProc", mockRepo, null);
+        final StandardProvenanceReporter reporter = new 
StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, null);
 
         final List<FlowFile> parents = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
@@ -51,7 +48,7 @@ public class TestStandardProvenanceReporter {
         reporter.fork(flowFile, parents);
         reporter.fork(flowFile, parents);
 
-        Set<ProvenanceEventRecord> records = reporter.getEvents();
+        final Set<ProvenanceEventRecord> records = reporter.getEvents();
         assertEquals(11, records.size());   // 1 for each parent in the spawn 
and 1 for the spawn itself
 
         final FlowFile firstParent = parents.get(0);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 9500e29..8dbc4d0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import kafka.common.FailedToSendMessageException;
 import kafka.javaapi.producer.Producer;
@@ -33,21 +32,14 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.MockFlowFileQueue;
-import org.apache.nifi.util.MockProcessSession;
-import org.apache.nifi.util.MockProvenanceReporter;
-import org.apache.nifi.util.MockSessionFactory;
-import org.apache.nifi.util.SharedSessionState;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestPutKafka {
 
@@ -150,19 +142,7 @@ public class TestPutKafka {
     public void testProvenanceReporterMessagesCount() {
         final TestableProcessor processor = new TestableProcessor();
 
-        ProvenanceReporter spyProvenanceReporter = Mockito.spy(new 
MockProvenanceReporter());
-
-        AtomicLong idGenerator = new AtomicLong(0L);
-        SharedSessionState sharedState = new SharedSessionState(processor, 
idGenerator);
-        Whitebox.setInternalState(sharedState, "provenanceReporter", 
spyProvenanceReporter);
-        MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue();
-        MockSessionFactory sessionFactory = 
Mockito.mock(MockSessionFactory.class);
-        MockProcessSession mockProcessSession = new 
MockProcessSession(sharedState);
-        
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
-
         final TestRunner runner = TestRunners.newTestRunner(processor);
-        Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
-        Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
 
         runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
@@ -173,28 +153,19 @@ public class TestPutKafka {
         runner.enqueue(bytes);
         runner.run();
 
-        MockFlowFile mockFlowFile = 
mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
-        Mockito.verify(spyProvenanceReporter, 
Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1", "Sent 4 messages");
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        assertEquals(1, events.size());
+        final ProvenanceEventRecord event = events.get(0);
+        assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        assertEquals("kafka://topic1", event.getTransitUri());
+        assertEquals("Sent 4 messages", event.getDetails());
     }
 
     @Test
     public void testProvenanceReporterWithoutDelimiterMessagesCount() {
         final TestableProcessor processor = new TestableProcessor();
 
-        ProvenanceReporter spyProvenanceReporter = Mockito.spy(new 
MockProvenanceReporter());
-
-        AtomicLong idGenerator = new AtomicLong(0L);
-        SharedSessionState sharedState = new SharedSessionState(processor, 
idGenerator);
-        Whitebox.setInternalState(sharedState, "provenanceReporter", 
spyProvenanceReporter);
-        MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue();
-        MockSessionFactory sessionFactory = 
Mockito.mock(MockSessionFactory.class);
-        MockProcessSession mockProcessSession = new 
MockProcessSession(sharedState);
-        
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
-
         final TestRunner runner = TestRunners.newTestRunner(processor);
-        Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
-        Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
-
         runner.setProperty(PutKafka.TOPIC, "topic1");
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
@@ -203,8 +174,11 @@ public class TestPutKafka {
         runner.enqueue(bytes);
         runner.run();
 
-        MockFlowFile mockFlowFile = 
mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
-        Mockito.verify(spyProvenanceReporter, 
Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1");
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        assertEquals(1, events.size());
+        final ProvenanceEventRecord event = events.get(0);
+        assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        assertEquals("kafka://topic1", event.getTransitUri());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java
index 1b75734..37364e5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java
@@ -27,25 +27,25 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.SharedSessionState;
 import org.junit.Assert;
-
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class NewestFirstPrioritizerTest {
 
     @Test
     public void testPrioritizer() throws InstantiationException, 
IllegalAccessException {
-        Processor processor = new SimpleProcessor();
-        AtomicLong idGenerator = new AtomicLong(0L);
-        MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator));
+        final Processor processor = new SimpleProcessor();
+        final AtomicLong idGenerator = new AtomicLong(0L);
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
 
-        MockFlowFile flowFile1 = session.create();
+        final MockFlowFile flowFile1 = session.create();
         try {
             Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 
is different than flowFile1
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
         }
-        MockFlowFile flowFile2 = session.create();
+        final MockFlowFile flowFile2 = session.create();
 
-        NewestFlowFileFirstPrioritizer prioritizer = new 
NewestFlowFileFirstPrioritizer();
+        final NewestFlowFileFirstPrioritizer prioritizer = new 
NewestFlowFileFirstPrioritizer();
         Assert.assertEquals(0, prioritizer.compare(null, null));
         Assert.assertEquals(-1, prioritizer.compare(flowFile1, null));
         Assert.assertEquals(1, prioritizer.compare(null, flowFile1));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java
index e46711d..68a8d7d 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java
@@ -27,25 +27,25 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.SharedSessionState;
 import org.junit.Assert;
-
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class OldestFirstPrioritizerTest {
 
     @Test
     public void testPrioritizer() throws InstantiationException, 
IllegalAccessException {
-        Processor processor = new SimpleProcessor();
-        AtomicLong idGenerator = new AtomicLong(0L);
-        MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator));
+        final Processor processor = new SimpleProcessor();
+        final AtomicLong idGenerator = new AtomicLong(0L);
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
 
-        MockFlowFile flowFile1 = session.create();
+        final MockFlowFile flowFile1 = session.create();
         try {
             Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 
is different than flowFile1
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
         }
-        MockFlowFile flowFile2 = session.create();
+        final MockFlowFile flowFile2 = session.create();
 
-        OldestFlowFileFirstPrioritizer prioritizer = new 
OldestFlowFileFirstPrioritizer();
+        final OldestFlowFileFirstPrioritizer prioritizer = new 
OldestFlowFileFirstPrioritizer();
         Assert.assertEquals(0, prioritizer.compare(null, null));
         Assert.assertEquals(-1, prioritizer.compare(flowFile1, null));
         Assert.assertEquals(1, prioritizer.compare(null, flowFile1));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
index d303423..d7d278c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.prioritizer;
 
 import org.apache.nifi.prioritizer.PriorityAttributePrioritizer;
+
 import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
@@ -31,9 +32,9 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.SharedSessionState;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class PriorityAttributePrioritizerTest {
 
@@ -58,27 +59,27 @@ public class PriorityAttributePrioritizerTest {
 
     @Test
     public void testPrioritizer() throws InstantiationException, 
IllegalAccessException {
-        Processor processor = new SimpleProcessor();
-        AtomicLong idGenerator = new AtomicLong(0L);
-        MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator));
+        final Processor processor = new SimpleProcessor();
+        final AtomicLong idGenerator = new AtomicLong(0L);
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
 
-        MockFlowFile ffNoPriority = session.create();
-        MockFlowFile ffPri1 = session.create();
+        final MockFlowFile ffNoPriority = session.create();
+        final MockFlowFile ffPri1 = session.create();
         ffPri1.putAttributes(attrsPri1);
-        MockFlowFile ffPri2 = session.create();
+        final MockFlowFile ffPri2 = session.create();
         ffPri2.putAttributes(attrsPri2);
-        MockFlowFile ffPrin1 = session.create();
+        final MockFlowFile ffPrin1 = session.create();
         ffPrin1.putAttributes(attrsPrin1);
-        MockFlowFile ffPriA = session.create();
+        final MockFlowFile ffPriA = session.create();
         ffPriA.putAttributes(attrsPriA);
-        MockFlowFile ffPriB = session.create();
+        final MockFlowFile ffPriB = session.create();
         ffPriB.putAttributes(attrsPriB);
-        MockFlowFile ffPriLP = session.create();
+        final MockFlowFile ffPriLP = session.create();
         ffPriLP.putAttributes(attrsPriLP);
-        MockFlowFile ffPriLN = session.create();
+        final MockFlowFile ffPriLN = session.create();
         ffPriLN.putAttributes(attrsPriLN);
 
-        PriorityAttributePrioritizer prioritizer = new 
PriorityAttributePrioritizer();
+        final PriorityAttributePrioritizer prioritizer = new 
PriorityAttributePrioritizer();
         assertEquals(0, prioritizer.compare(null, null));
         assertEquals(-1, prioritizer.compare(ffNoPriority, null));
         assertEquals(1, prioritizer.compare(null, ffNoPriority));

Reply via email to