Repository: incubator-nifi Updated Branches: refs/heads/develop a8722317a -> bc9462514
NIFI-70: If ROUTE indicates FlowFile routed to same Connection it came from (and only that connection!) then drop the redundant ROUTE event Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bc946251 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bc946251 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bc946251 Branch: refs/heads/develop Commit: bc94625142163d60af7aad27ec9de11b79c2ba21 Parents: a872231 Author: Mark Payne <[email protected]> Authored: Fri Dec 12 12:02:49 2014 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Dec 12 12:02:49 2014 -0500 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 48 +++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bc946251/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index fbbb29b..60dcdb3 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -538,7 +538,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { continue; } - + if ( isSpuriousRouteEvent(event, checkpoint.records) ) { + continue; + } + + // Check if the event indicates that the FlowFile was routed to the same + // connection from which it was pulled (and only this connection). If so, discard the event. + isSpuriousRouteEvent(event, checkpoint.records); + recordsToSubmit.add(event); addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); } @@ -776,6 +783,45 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return false; } + + /** + * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile + * was routed to a relationship with only 1 connection and that Connection is the Connection from which + * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere. + * + * @param event + * @param records + * @return + */ + private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) { + if ( event.getEventType() == ProvenanceEventType.ROUTE ) { + final String relationshipName = event.getRelationship(); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + final Collection<Connection> connectionsForRelationship = this.context.getConnections(relationship); + + // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event, + // as it may be cloning the FlowFile and adding to multiple connections. + if ( connectionsForRelationship.size() == 1 ) { + for ( final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet() ) { + final FlowFileRecord flowFileRecord = entry.getKey(); + if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) { + final StandardRepositoryRecord repoRecord = entry.getValue(); + if ( repoRecord.getOriginalQueue() == null ) { + return false; + } + + final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier(); + final Connection destinationConnection = connectionsForRelationship.iterator().next(); + final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier(); + return originalQueueId.equals(destinationQueueId); + } + } + } + } + + return false; + } + @Override public void rollback() { rollback(false);
