Repository: incubator-nifi Updated Branches: refs/heads/develop 28abc9acb -> 739baa2e5
NIFI-37: Ensure that prov events that are emitted are emitted against FlowFiles that are known to that session. Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/739baa2e Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/739baa2e Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/739baa2e Branch: refs/heads/develop Commit: 739baa2e5711b0677a451c4acadb10e232d5bf2b Parents: 28abc9a Author: Mark Payne <[email protected]> Authored: Tue Jun 16 20:32:22 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Jun 17 07:56:49 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/util/MockProcessSession.java | 40 +- .../nifi/util/MockProvenanceReporter.java | 365 ++++++++++++++++--- .../apache/nifi/util/MockSessionFactory.java | 8 +- .../apache/nifi/util/SharedSessionState.java | 26 +- .../nifi/util/StandardProcessorTestRunner.java | 34 +- .../java/org/apache/nifi/util/TestRunner.java | 15 + .../repository/StandardProcessSession.java | 27 +- .../repository/StandardProvenanceReporter.java | 32 +- .../TestStandardProvenanceReporter.java | 7 +- .../nifi/processors/kafka/TestPutKafka.java | 52 +-- .../prioritizer/NewestFirstPrioritizerTest.java | 16 +- .../prioritizer/OldestFirstPrioritizerTest.java | 16 +- .../PriorityAttributePrioritizerTest.java | 27 +- 13 files changed, 499 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index e9bb778..13198dc 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -40,11 +40,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; -import org.junit.Assert; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileAccessException; @@ -54,6 +54,7 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.provenance.ProvenanceReporter; +import org.junit.Assert; public class MockProcessSession implements ProcessSession { @@ -65,14 +66,16 @@ public class MockProcessSession implements ProcessSession { private final Map<Long, MockFlowFile> originalVersions = new HashMap<>(); private final SharedSessionState sharedState; private final Map<String, Long> counterMap = new HashMap<>(); + private final ProvenanceReporter provenanceReporter; private boolean committed = false; private boolean rolledback = false; private int removedCount = 0; - public MockProcessSession(final SharedSessionState sharedState) { + public MockProcessSession(final SharedSessionState sharedState, final Processor processor) { this.sharedState = sharedState; this.processorQueue = sharedState.getFlowFileQueue(); + provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName()); } @Override @@ -194,7 +197,7 @@ public class MockProcessSession implements ProcessSession { try { out.write(mock.getData()); - } catch (IOException e) { + } catch (final IOException e) { throw new FlowFileAccessException(e.toString(), e); } } @@ -409,7 +412,7 @@ public class MockProcessSession implements ProcessSession { final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); try { callback.process(bais); - } catch (IOException e) { + } catch (final IOException e) { throw new ProcessException(e.toString(), e); } } @@ -766,7 +769,7 @@ public class MockProcessSession implements ProcessSession { if (source == null || destination == null || source == destination) { return destination; //don't need to inherit from ourselves } - FlowFile updated = putAllAttributes(destination, source.getAttributes()); + final FlowFile updated = putAllAttributes(destination, source.getAttributes()); getProvenanceReporter().fork(source, Collections.singletonList(updated)); return updated; } @@ -803,7 +806,7 @@ public class MockProcessSession implements ProcessSession { } } - FlowFile updated = putAllAttributes(destination, intersectAttributes(sources)); + final FlowFile updated = putAllAttributes(destination, intersectAttributes(sources)); getProvenanceReporter().join(sources, updated); return updated; } @@ -982,7 +985,7 @@ public class MockProcessSession implements ProcessSession { @Override public ProvenanceReporter getProvenanceReporter() { - return sharedState.getProvenanceReporter(); + return provenanceReporter; } @Override @@ -997,4 +1000,27 @@ public class MockProcessSession implements ProcessSession { validateState(flowFile); return flowFile.getData(); } + + /** + * Checks if a FlowFile is known in this session. + * + * @param flowFile + * the FlowFile to check + * @return <code>true</code> if the FlowFile is known in this session, + * <code>false</code> otherwise. + */ + boolean isFlowFileKnown(final FlowFile flowFile) { + final FlowFile curFlowFile = currentVersions.get(flowFile.getId()); + if (curFlowFile == null) { + return false; + } + + final String curUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key()); + final String providedUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key()); + if (!curUuid.equals(providedUuid)) { + return false; + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java index 097eafd..8c9a320 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java @@ -17,186 +17,437 @@ package org.apache.nifi.util; import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileHandlingException; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MockProvenanceReporter implements ProvenanceReporter { + private static final Logger logger = LoggerFactory.getLogger(MockProvenanceReporter.class); + private final MockProcessSession session; + private final String processorId; + private final String processorType; + private final SharedSessionState sharedSessionState; + private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>(); - @Override - public void receive(FlowFile flowFile, String sourceSystemUri) { - + public MockProvenanceReporter(final MockProcessSession session, final SharedSessionState sharedState, final String processorId, final String processorType) { + this.session = session; + this.sharedSessionState = sharedState; + this.processorId = processorId; + this.processorType = processorType; } - @Override - public void send(FlowFile flowFile, String destinationSystemUri) { + private void verifyFlowFileKnown(final FlowFile flowFile) { + if (session != null && !session.isFlowFileKnown(flowFile)) { + throw new FlowFileHandlingException(flowFile + " is not known to " + session); + } + } + Set<ProvenanceEventRecord> getEvents() { + return Collections.unmodifiableSet(events); } - @Override - public void send(FlowFile flowFile, String destinationSystemUri, boolean force) { + /** + * Removes the given event from the reporter + * + * @param event + * event + */ + void remove(final ProvenanceEventRecord event) { + events.remove(event); + } + void clear() { + events.clear(); } - @Override - public void receive(FlowFile flowFile, String sourceSystemUri, long transmissionMillis) { + /** + * Generates a Fork event for the given child and parents but does not + * register the event. This is useful so that a ProcessSession has the + * ability to de-dupe events, since one or more events may be created by the + * session itself, as well as by the Processor + * + * @param parents + * parents + * @param child + * child + * @return record + */ + ProvenanceEventRecord generateJoinEvent(final Collection<FlowFile> parents, final FlowFile child) { + final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); + eventBuilder.addChildFlowFile(child); - } + for (final FlowFile parent : parents) { + eventBuilder.addParentFlowFile(parent); + } - @Override - public void receive(FlowFile flowFile, String sourceSystemUri, String sourceSystemFlowFileIdentifier) { + return eventBuilder.build(); + } + ProvenanceEventRecord generateDropEvent(final FlowFile flowFile, final String details) { + return build(flowFile, ProvenanceEventType.DROP).setDetails(details).build(); } @Override - public void receive(FlowFile flowFile, String sourceSystemUri, String sourceSystemFlowFileIdentifier, long transmissionMillis) { - + public void receive(final FlowFile flowFile, final String transitUri) { + receive(flowFile, transitUri, -1L); } @Override - public void send(FlowFile flowFile, String destinationSystemUri, long transmissionMillis) { - + public void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier) { + receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L); } @Override - public void send(FlowFile flowFile, String destinationSystemUri, long transmissionMillis, boolean force) { - + public void receive(final FlowFile flowFile, final String transitUri, final long transmissionMillis) { + receive(flowFile, transitUri, null, transmissionMillis); } @Override - public void associate(FlowFile flowFile, String alternateIdentifierNamespace, String alternateIdentifier) { - + public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final long transmissionMillis) { + receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, transmissionMillis); } @Override - public void fork(FlowFile parent, Collection<FlowFile> children) { + public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final String details, final long transmissionMillis) { + verifyFlowFileKnown(flowFile); + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) + .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void fork(FlowFile parent, Collection<FlowFile> children, long forkDuration) { - + public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis) { + send(flowFile, transitUri, transmissionMillis, true); } @Override - public void fork(FlowFile parent, Collection<FlowFile> children, String details) { - + public void send(final FlowFile flowFile, final String transitUri) { + send(flowFile, transitUri, null, -1L, true); } @Override - public void fork(FlowFile parent, java.util.Collection<FlowFile> children, String details, long forkDuration) { - + public void send(final FlowFile flowFile, final String transitUri, final String details) { + send(flowFile, transitUri, details, -1L, true); } @Override - public void join(Collection<FlowFile> parents, FlowFile child) { - + public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis, final boolean force) { + send(flowFile, transitUri, null, transmissionMillis, force); } @Override - public void join(Collection<FlowFile> parents, FlowFile child, long joinDuration) { + public void send(final FlowFile flowFile, final String transitUri, final String details, final boolean force) { + send(flowFile, transitUri, details, -1L, force); + } + @Override + public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) { + send(flowFile, transitUri, details, transmissionMillis, true); } @Override - public void join(Collection<FlowFile> parents, FlowFile child, String details) { + public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) { + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build(); + if (force) { + sharedSessionState.addProvenanceEvents(Collections.singleton(record)); + } else { + events.add(record); + } + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + @Override + public void send(final FlowFile flowFile, final String transitUri, final boolean force) { + send(flowFile, transitUri, -1L, true); } @Override - public void join(java.util.Collection<FlowFile> parents, FlowFile child, String details, long joinDuration) { + public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) { + try { + String trimmedNamespace = alternateIdentifierNamespace.trim(); + if (trimmedNamespace.endsWith(":")) { + trimmedNamespace = trimmedNamespace.substring(0, trimmedNamespace.length() - 1); + } + + String trimmedIdentifier = alternateIdentifier.trim(); + if (trimmedIdentifier.startsWith(":")) { + if (trimmedIdentifier.length() == 1) { + throw new IllegalArgumentException("Illegal alternateIdentifier: " + alternateIdentifier); + } + trimmedIdentifier = trimmedIdentifier.substring(1); + } + final String alternateIdentifierUri = trimmedNamespace + ":" + trimmedIdentifier; + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ADDINFO).setAlternateIdentifierUri(alternateIdentifierUri).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } - @Override - public void clone(FlowFile parent, FlowFile child) { + ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) { + try { + final ProvenanceEventBuilder builder = build(flowFile, ProvenanceEventType.DROP); + if (reason != null) { + builder.setDetails("Discard reason: " + reason); + } + final ProvenanceEventRecord record = builder.build(); + events.add(record); + return record; + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + return null; + } + } + void expire(final FlowFile flowFile, final String details) { + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.EXPIRE).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void modifyContent(FlowFile flowFile) { - + public void fork(final FlowFile parent, final Collection<FlowFile> children) { + fork(parent, children, null, -1L); } @Override - public void modifyContent(FlowFile flowFile, String details) { + public void fork(final FlowFile parent, final Collection<FlowFile> children, final long forkDuration) { + fork(parent, children, null, forkDuration); + } + @Override + public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details) { + fork(parent, children, details, -1L); } @Override - public void modifyContent(FlowFile flowFile, long processingMillis) { + public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details, final long forkDuration) { + verifyFlowFileKnown(parent); + + try { + final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.FORK); + eventBuilder.addParentFlowFile(parent); + for (final FlowFile child : children) { + eventBuilder.addChildFlowFile(child); + } + if (forkDuration > -1L) { + eventBuilder.setEventDuration(forkDuration); + } + + if (details != null) { + eventBuilder.setDetails(details); + } + + events.add(eventBuilder.build()); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void modifyContent(FlowFile flowFile, String details, long processingMillis) { - + public void join(final Collection<FlowFile> parents, final FlowFile child) { + join(parents, child, null, -1L); } @Override - public void modifyAttributes(FlowFile flowFile) { + public void join(final Collection<FlowFile> parents, final FlowFile child, final long joinDuration) { + join(parents, child, null, joinDuration); + } + @Override + public void join(final Collection<FlowFile> parents, final FlowFile child, final String details) { + join(parents, child, details, -1L); } @Override - public void modifyAttributes(FlowFile flowFile, String details) { + public void join(final Collection<FlowFile> parents, final FlowFile child, final String details, final long joinDuration) { + verifyFlowFileKnown(child); + + try { + final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); + eventBuilder.addChildFlowFile(child); + eventBuilder.setDetails(details); + for (final FlowFile parent : parents) { + eventBuilder.addParentFlowFile(parent); + } + + events.add(eventBuilder.build()); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void route(FlowFile flowFile, Relationship relationship) { + public void clone(final FlowFile parent, final FlowFile child) { + verifyFlowFileKnown(child); + try { + final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE); + eventBuilder.addChildFlowFile(child); + eventBuilder.addParentFlowFile(parent); + events.add(eventBuilder.build()); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void route(FlowFile flowFile, Relationship relationship, String details) { - + public void modifyContent(final FlowFile flowFile) { + modifyContent(flowFile, null, -1L); } @Override - public void route(FlowFile flowFile, Relationship relationship, long processingDuration) { + public void modifyContent(final FlowFile flowFile, final String details) { + modifyContent(flowFile, details, -1L); + } + @Override + public void modifyContent(final FlowFile flowFile, final long processingMillis) { + modifyContent(flowFile, null, processingMillis); } @Override - public void route(FlowFile flowFile, Relationship relationship, String details, long processingDuration) { + public void modifyContent(final FlowFile flowFile, final String details, final long processingMillis) { + verifyFlowFileKnown(flowFile); + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void create(FlowFile flowFile) { - + public void modifyAttributes(final FlowFile flowFile) { + modifyAttributes(flowFile, null); } @Override - public void create(FlowFile flowFile, String details) { + public void modifyAttributes(final FlowFile flowFile, final String details) { + verifyFlowFileKnown(flowFile); + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void receive(FlowFile flowFile, String sourceSystemUri, String sourceSystemFlowFileIdentifier, String details, long transmissionMillis) { - + public void route(final FlowFile flowFile, final Relationship relationship) { + route(flowFile, relationship, null); } @Override - public void send(FlowFile flowFile, String destinationSystemUri, String details) { + public void route(final FlowFile flowFile, final Relationship relationship, final long processingDuration) { + route(flowFile, relationship, null, processingDuration); + } + @Override + public void route(final FlowFile flowFile, final Relationship relationship, final String details) { + route(flowFile, relationship, details, -1L); } @Override - public void send(FlowFile flowFile, String destinationSystemUri, String details, long transmissionMillis) { + public void route(final FlowFile flowFile, final Relationship relationship, final String details, final long processingDuration) { + verifyFlowFileKnown(flowFile); + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } } @Override - public void send(FlowFile flowFile, String destinationSystemUri, String details, boolean force) { - + public void create(final FlowFile flowFile) { + create(flowFile, null); } @Override - public void send(FlowFile flowFile, String destinationSystemUri, String details, long transmissionMillis, boolean force) { + public void create(final FlowFile flowFile, final String details) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventType(eventType); + builder.fromFlowFile(flowFile); + builder.setLineageStartDate(flowFile.getLineageStartDate()); + builder.setComponentId(processorId); + builder.setComponentType(processorType); + return builder; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java index 0f4bbc6..49b8796 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java @@ -22,20 +22,22 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; public class MockSessionFactory implements ProcessSessionFactory { + private final Processor processor; private final SharedSessionState sharedState; - private final Set<MockProcessSession> createdSessions = new CopyOnWriteArraySet<>(); - MockSessionFactory(final SharedSessionState sharedState) { + MockSessionFactory(final SharedSessionState sharedState, final Processor processor) { this.sharedState = sharedState; + this.processor = processor; } @Override public ProcessSession createSession() { - final MockProcessSession session = new MockProcessSession(sharedState); + final MockProcessSession session = new MockProcessSession(sharedState, processor); createdSessions.add(session); return session; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java index 65d79a6..994735b 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java @@ -16,11 +16,18 @@ */ package org.apache.nifi.util; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.processor.Processor; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceReporter; public class SharedSessionState { @@ -31,14 +38,27 @@ public class SharedSessionState { private final Processor processor; private final AtomicLong flowFileIdGenerator; private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>(); + private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>(); public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) { flowFileQueue = new MockFlowFileQueue(); - provenanceReporter = new MockProvenanceReporter(); + provenanceReporter = new MockProvenanceReporter(null, this, UUID.randomUUID().toString(), "N/A"); this.flowFileIdGenerator = flowFileIdGenerator; this.processor = processor; } + void addProvenanceEvents(final Collection<ProvenanceEventRecord> events) { + this.events.addAll(events); + } + + void clearProvenanceEvents() { + this.events.clear(); + } + + public List<ProvenanceEventRecord> getProvenanceEvents() { + return new ArrayList<>(this.events); + } + public MockFlowFileQueue getFlowFileQueue() { return flowFileQueue; } @@ -55,7 +75,7 @@ public class SharedSessionState { AtomicLong counter = counterMap.get(name); if (counter == null) { counter = new AtomicLong(0L); - AtomicLong existingCounter = counterMap.putIfAbsent(name, counter); + final AtomicLong existingCounter = counterMap.putIfAbsent(name, counter); if (existingCounter != null) { counter = existingCounter; } @@ -66,6 +86,6 @@ public class SharedSessionState { public Long getCounterValue(final String name) { final AtomicLong counterValue = counterMap.get(name); - return (counterValue == null) ? null : counterValue.get(); + return counterValue == null ? null : counterValue.get(); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 7048cfe..655f2df 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -64,11 +64,10 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.reporting.InitializationException; import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class StandardProcessorTestRunner implements TestRunner { @@ -83,7 +82,6 @@ public class StandardProcessorTestRunner implements TestRunner { private int numThreads = 1; private final AtomicInteger invocations = new AtomicInteger(0); - private static final Logger logger = LoggerFactory.getLogger(StandardProcessorTestRunner.class); private static final Set<Class<? extends Annotation>> deprecatedTypeAnnotations = new HashSet<>(); private static final Set<Class<? extends Annotation>> deprecatedMethodAnnotations = new HashSet<>(); @@ -99,7 +97,7 @@ public class StandardProcessorTestRunner implements TestRunner { this.idGenerator = new AtomicLong(0L); this.sharedState = new SharedSessionState(processor, idGenerator); this.flowFileQueue = sharedState.getFlowFileQueue(); - this.sessionFactory = new MockSessionFactory(sharedState); + this.sessionFactory = new MockSessionFactory(sharedState, processor); this.context = new MockProcessContext(processor); detectDeprecatedAnnotations(processor); @@ -109,7 +107,7 @@ public class StandardProcessorTestRunner implements TestRunner { try { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); - } catch (Exception e) { + } catch (final Exception e) { Assert.fail("Could not invoke methods annotated with @OnAdded annotation due to: " + e); } @@ -194,7 +192,7 @@ public class StandardProcessorTestRunner implements TestRunner { if (initialize) { try { ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); Assert.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e); } @@ -223,7 +221,7 @@ public class StandardProcessorTestRunner implements TestRunner { unscheduledRun = true; try { ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context); - } catch (Exception e) { + } catch (final Exception e) { Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e); } } @@ -234,7 +232,7 @@ public class StandardProcessorTestRunner implements TestRunner { if (!unscheduledRun) { try { ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context); - } catch (Exception e) { + } catch (final Exception e) { Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e); } } @@ -242,7 +240,7 @@ public class StandardProcessorTestRunner implements TestRunner { if (stopOnFinish) { try { ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor); - } catch (Exception e) { + } catch (final Exception e) { Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e); } } @@ -255,7 +253,7 @@ public class StandardProcessorTestRunner implements TestRunner { public void shutdown() { try { ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, processor); - } catch (Exception e) { + } catch (final Exception e) { Assert.fail("Could not invoke methods annotated with @OnShutdown annotation due to: " + e); } } @@ -388,7 +386,7 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void enqueue(final InputStream data, final Map<String, String> attributes) { - final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator)); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor); MockFlowFile flowFile = session.create(); flowFile = session.importFrom(data, flowFile); flowFile = session.putAllAttributes(flowFile, attributes); @@ -423,7 +421,11 @@ public class StandardProcessorTestRunner implements TestRunner { return flowFiles; } + /** + * @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted. + */ @Override + @Deprecated public ProvenanceReporter getProvenanceReporter() { return sharedState.getProvenanceReporter(); } @@ -703,4 +705,14 @@ public class StandardProcessorTestRunner implements TestRunner { return context.removeProperty(descriptor); } + @Override + public List<ProvenanceEventRecord> getProvenanceEvents() { + return sharedState.getProvenanceEvents(); + } + + @Override + public void clearProvenanceEvents() { + sharedState.clearProvenanceEvents(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index c4fec35..a599e5b 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -32,6 +32,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.reporting.InitializationException; @@ -702,4 +703,18 @@ public interface TestRunner { * @return true if removed */ boolean removeProperty(PropertyDescriptor descriptor); + + /** + * Returns a {@link List} of all {@link ProvenanceEventRecord}s that were + * emitted by the Processor + * + * @return a List of all Provenance Events that were emitted by the + * Processor + */ + List<ProvenanceEventRecord> getProvenanceEvents(); + + /** + * Clears the Provenance Events that have been emitted by the Processor + */ + void clearProvenanceEvents(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index a048d21..4ee8c06 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -178,7 +178,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new AssertionError("Connectable type is " + connectable.getConnectableType()); } - this.provenanceReporter = new StandardProvenanceReporter(connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this); + this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType, + context.getProvenanceRepository(), this); this.sessionId = idGenerator.getAndIncrement(); this.connectableDescription = description; @@ -324,7 +325,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final Connectable connectable = context.getConnectable(); - final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable; + final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { //records which have been updated - remove original if exists @@ -651,7 +652,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return new Iterator<ProvenanceEventRecord>() { @Override public boolean hasNext() { - return recordsToSubmitIterator.hasNext() || (autoTermIterator != null && autoTermIterator.hasNext()); + return recordsToSubmitIterator.hasNext() || autoTermIterator != null && autoTermIterator.hasNext(); } @Override @@ -1056,8 +1057,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private void formatNanos(final long nanos, final StringBuilder sb) { - final long seconds = (nanos > 1000000000L) ? (nanos / 1000000000L) : 0L; - long millis = (nanos > 1000000L) ? (nanos / 1000000L) : 0L;; + final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L; + long millis = nanos > 1000000L ? nanos / 1000000L : 0L;; final long nanosLeft = nanos % 1000000L; if (seconds > 0) { @@ -1609,7 +1610,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE processorType = connectable.getClass().getSimpleName(); } - final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(connectable.getIdentifier(), + final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), processorType, context.getProvenanceRepository(), this); final Map<String, FlowFileRecord> recordIdMap = new HashMap<>(); @@ -1623,7 +1624,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE removeContent(flowFile.getContentClaim()); final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); - final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable; + final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); } @@ -1828,7 +1829,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE readCount += copied; // don't add demarcator after the last claim - if (useDemarcator && (++objectIndex < numSources)) { + if (useDemarcator && ++objectIndex < numSources) { out.write(demarcator); writtenCount += demarcator.length; } @@ -2488,6 +2489,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + /** + * Checks if a FlowFile is known in this session. + * + * @param flowFile the FlowFile to check + * @return <code>true</code> if the FlowFile is known in this session, <code>false</code> otherwise. + */ + boolean isFlowFileKnown(final FlowFile flowFile) { + return records.containsKey(flowFile); + } + @Override public FlowFile create(final FlowFile parent) { final Map<String, String> newAttributes = new HashMap<>(3); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index a55fb25..5194fef 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -23,12 +23,12 @@ import java.util.Set; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileHandlingException; import org.apache.nifi.provenance.ProvenanceEventBuilder; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +41,11 @@ public class StandardProvenanceReporter implements ProvenanceReporter { private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>(); private final ProvenanceEventRepository repository; private final ProvenanceEventEnricher eventEnricher; + private final StandardProcessSession session; - public StandardProvenanceReporter(final String processorId, final String processorType, final ProvenanceEventRepository repository, final ProvenanceEventEnricher enricher) { + public StandardProvenanceReporter(final StandardProcessSession session, final String processorId, final String processorType, + final ProvenanceEventRepository repository, final ProvenanceEventEnricher enricher) { + this.session = session; this.processorId = processorId; this.processorType = processorType; this.repository = repository; @@ -89,6 +92,12 @@ public class StandardProvenanceReporter implements ProvenanceReporter { return build(flowFile, ProvenanceEventType.DROP).setDetails(details).build(); } + private void verifyFlowFileKnown(final FlowFile flowFile) { + if (session != null && !session.isFlowFileKnown(flowFile)) { + throw new FlowFileHandlingException(flowFile + " is not known to " + session); + } + } + @Override public void receive(final FlowFile flowFile, final String transitUri) { receive(flowFile, transitUri, -1L); @@ -111,6 +120,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final String details, final long transmissionMillis) { + verifyFlowFileKnown(flowFile); + try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); @@ -157,8 +168,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) { try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build(); - - final ProvenanceEventRecord enriched = eventEnricher.enrich(record, flowFile); + final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile); if (force) { repository.registerEvent(enriched); @@ -252,6 +262,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details, final long forkDuration) { + verifyFlowFileKnown(parent); + try { final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.FORK); eventBuilder.addParentFlowFile(parent); @@ -293,6 +305,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void join(final Collection<FlowFile> parents, final FlowFile child, final String details, final long joinDuration) { + verifyFlowFileKnown(child); + try { final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); eventBuilder.addChildFlowFile(child); @@ -313,6 +327,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void clone(final FlowFile parent, final FlowFile child) { + verifyFlowFileKnown(child); + try { final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE); eventBuilder.addChildFlowFile(child); @@ -343,6 +359,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void modifyContent(final FlowFile flowFile, final String details, final long processingMillis) { + verifyFlowFileKnown(flowFile); + try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build(); events.add(record); @@ -361,6 +379,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void modifyAttributes(final FlowFile flowFile, final String details) { + verifyFlowFileKnown(flowFile); + try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build(); events.add(record); @@ -389,6 +409,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void route(final FlowFile flowFile, final Relationship relationship, final String details, final long processingDuration) { + verifyFlowFileKnown(flowFile); + try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build(); events.add(record); @@ -407,6 +429,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void create(final FlowFile flowFile, final String details) { + verifyFlowFileKnown(flowFile); + try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE).setDetails(details).build(); events.add(record); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java index 07480de..c079163 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProvenanceReporter.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.controller.repository; -import org.apache.nifi.controller.repository.StandardProvenanceReporter; -import org.apache.nifi.controller.repository.StandardFlowFileRecord; import static org.junit.Assert.assertEquals; import java.util.ArrayList; @@ -27,7 +25,6 @@ import java.util.Set; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; - import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; @@ -38,7 +35,7 @@ public class TestStandardProvenanceReporter { @Ignore public void testDuplicatesIgnored() { final ProvenanceEventRepository mockRepo = Mockito.mock(ProvenanceEventRepository.class); - final StandardProvenanceReporter reporter = new StandardProvenanceReporter("1234", "TestProc", mockRepo, null); + final StandardProvenanceReporter reporter = new StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, null); final List<FlowFile> parents = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -51,7 +48,7 @@ public class TestStandardProvenanceReporter { reporter.fork(flowFile, parents); reporter.fork(flowFile, parents); - Set<ProvenanceEventRecord> records = reporter.getEvents(); + final Set<ProvenanceEventRecord> records = reporter.getEvents(); assertEquals(11, records.size()); // 1 for each parent in the spawn and 1 for the spawn itself final FlowFile firstParent = parents.get(0); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 9500e29..8dbc4d0 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import kafka.common.FailedToSendMessageException; import kafka.javaapi.producer.Producer; @@ -33,21 +32,14 @@ import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.nifi.annotation.lifecycle.OnScheduled; - import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockFlowFileQueue; -import org.apache.nifi.util.MockProcessSession; -import org.apache.nifi.util.MockProvenanceReporter; -import org.apache.nifi.util.MockSessionFactory; -import org.apache.nifi.util.SharedSessionState; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.internal.util.reflection.Whitebox; public class TestPutKafka { @@ -150,19 +142,7 @@ public class TestPutKafka { public void testProvenanceReporterMessagesCount() { final TestableProcessor processor = new TestableProcessor(); - ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter()); - - AtomicLong idGenerator = new AtomicLong(0L); - SharedSessionState sharedState = new SharedSessionState(processor, idGenerator); - Whitebox.setInternalState(sharedState, "provenanceReporter", spyProvenanceReporter); - MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue(); - MockSessionFactory sessionFactory = Mockito.mock(MockSessionFactory.class); - MockProcessSession mockProcessSession = new MockProcessSession(sharedState); - Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession); - final TestRunner runner = TestRunners.newTestRunner(processor); - Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue); - Whitebox.setInternalState(runner, "sessionFactory", sessionFactory); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); @@ -173,28 +153,19 @@ public class TestPutKafka { runner.enqueue(bytes); runner.run(); - MockFlowFile mockFlowFile = mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); - Mockito.verify(spyProvenanceReporter, Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1", "Sent 4 messages"); + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.SEND, event.getEventType()); + assertEquals("kafka://topic1", event.getTransitUri()); + assertEquals("Sent 4 messages", event.getDetails()); } @Test public void testProvenanceReporterWithoutDelimiterMessagesCount() { final TestableProcessor processor = new TestableProcessor(); - ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter()); - - AtomicLong idGenerator = new AtomicLong(0L); - SharedSessionState sharedState = new SharedSessionState(processor, idGenerator); - Whitebox.setInternalState(sharedState, "provenanceReporter", spyProvenanceReporter); - MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue(); - MockSessionFactory sessionFactory = Mockito.mock(MockSessionFactory.class); - MockProcessSession mockProcessSession = new MockProcessSession(sharedState); - Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession); - final TestRunner runner = TestRunners.newTestRunner(processor); - Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue); - Whitebox.setInternalState(runner, "sessionFactory", sessionFactory); - runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); @@ -203,8 +174,11 @@ public class TestPutKafka { runner.enqueue(bytes); runner.run(); - MockFlowFile mockFlowFile = mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); - Mockito.verify(spyProvenanceReporter, Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1"); + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.SEND, event.getEventType()); + assertEquals("kafka://topic1", event.getTransitUri()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java index 1b75734..37364e5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/NewestFirstPrioritizerTest.java @@ -27,25 +27,25 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessSession; import org.apache.nifi.util.SharedSessionState; import org.junit.Assert; - import org.junit.Test; +import org.mockito.Mockito; public class NewestFirstPrioritizerTest { @Test public void testPrioritizer() throws InstantiationException, IllegalAccessException { - Processor processor = new SimpleProcessor(); - AtomicLong idGenerator = new AtomicLong(0L); - MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator)); + final Processor processor = new SimpleProcessor(); + final AtomicLong idGenerator = new AtomicLong(0L); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); - MockFlowFile flowFile1 = session.create(); + final MockFlowFile flowFile1 = session.create(); try { Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1 - } catch (InterruptedException e) { + } catch (final InterruptedException e) { } - MockFlowFile flowFile2 = session.create(); + final MockFlowFile flowFile2 = session.create(); - NewestFlowFileFirstPrioritizer prioritizer = new NewestFlowFileFirstPrioritizer(); + final NewestFlowFileFirstPrioritizer prioritizer = new NewestFlowFileFirstPrioritizer(); Assert.assertEquals(0, prioritizer.compare(null, null)); Assert.assertEquals(-1, prioritizer.compare(flowFile1, null)); Assert.assertEquals(1, prioritizer.compare(null, flowFile1)); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java index e46711d..68a8d7d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/OldestFirstPrioritizerTest.java @@ -27,25 +27,25 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessSession; import org.apache.nifi.util.SharedSessionState; import org.junit.Assert; - import org.junit.Test; +import org.mockito.Mockito; public class OldestFirstPrioritizerTest { @Test public void testPrioritizer() throws InstantiationException, IllegalAccessException { - Processor processor = new SimpleProcessor(); - AtomicLong idGenerator = new AtomicLong(0L); - MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator)); + final Processor processor = new SimpleProcessor(); + final AtomicLong idGenerator = new AtomicLong(0L); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); - MockFlowFile flowFile1 = session.create(); + final MockFlowFile flowFile1 = session.create(); try { Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1 - } catch (InterruptedException e) { + } catch (final InterruptedException e) { } - MockFlowFile flowFile2 = session.create(); + final MockFlowFile flowFile2 = session.create(); - OldestFlowFileFirstPrioritizer prioritizer = new OldestFlowFileFirstPrioritizer(); + final OldestFlowFileFirstPrioritizer prioritizer = new OldestFlowFileFirstPrioritizer(); Assert.assertEquals(0, prioritizer.compare(null, null)); Assert.assertEquals(-1, prioritizer.compare(flowFile1, null)); Assert.assertEquals(1, prioritizer.compare(null, flowFile1)); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739baa2e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java index d303423..d7d278c 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java @@ -17,6 +17,7 @@ package org.apache.nifi.prioritizer; import org.apache.nifi.prioritizer.PriorityAttributePrioritizer; + import static org.junit.Assert.assertEquals; import java.util.HashMap; @@ -31,9 +32,9 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessSession; import org.apache.nifi.util.SharedSessionState; - import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; public class PriorityAttributePrioritizerTest { @@ -58,27 +59,27 @@ public class PriorityAttributePrioritizerTest { @Test public void testPrioritizer() throws InstantiationException, IllegalAccessException { - Processor processor = new SimpleProcessor(); - AtomicLong idGenerator = new AtomicLong(0L); - MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator)); + final Processor processor = new SimpleProcessor(); + final AtomicLong idGenerator = new AtomicLong(0L); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); - MockFlowFile ffNoPriority = session.create(); - MockFlowFile ffPri1 = session.create(); + final MockFlowFile ffNoPriority = session.create(); + final MockFlowFile ffPri1 = session.create(); ffPri1.putAttributes(attrsPri1); - MockFlowFile ffPri2 = session.create(); + final MockFlowFile ffPri2 = session.create(); ffPri2.putAttributes(attrsPri2); - MockFlowFile ffPrin1 = session.create(); + final MockFlowFile ffPrin1 = session.create(); ffPrin1.putAttributes(attrsPrin1); - MockFlowFile ffPriA = session.create(); + final MockFlowFile ffPriA = session.create(); ffPriA.putAttributes(attrsPriA); - MockFlowFile ffPriB = session.create(); + final MockFlowFile ffPriB = session.create(); ffPriB.putAttributes(attrsPriB); - MockFlowFile ffPriLP = session.create(); + final MockFlowFile ffPriLP = session.create(); ffPriLP.putAttributes(attrsPriLP); - MockFlowFile ffPriLN = session.create(); + final MockFlowFile ffPriLN = session.create(); ffPriLN.putAttributes(attrsPriLN); - PriorityAttributePrioritizer prioritizer = new PriorityAttributePrioritizer(); + final PriorityAttributePrioritizer prioritizer = new PriorityAttributePrioritizer(); assertEquals(0, prioritizer.compare(null, null)); assertEquals(-1, prioritizer.compare(ffNoPriority, null)); assertEquals(1, prioritizer.compare(null, ffNoPriority));
