Repository: incubator-nifi
Updated Branches:
  refs/heads/develop a8722317a -> bc9462514


NIFI-70: If ROUTE indicates FlowFile routed to same Connection it came from 
(and only that connection!) then drop the redundant ROUTE event


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bc946251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bc946251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bc946251

Branch: refs/heads/develop
Commit: bc94625142163d60af7aad27ec9de11b79c2ba21
Parents: a872231
Author: Mark Payne <[email protected]>
Authored: Fri Dec 12 12:02:49 2014 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Dec 12 12:02:49 2014 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 48 +++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bc946251/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index fbbb29b..60dcdb3 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -538,7 +538,14 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
                 continue;
             }
-
+            if ( isSpuriousRouteEvent(event, checkpoint.records) ) {
+                continue;
+            }
+            
+            // Check if the event indicates that the FlowFile was routed to 
the same 
+            // connection from which it was pulled (and only this connection). 
If so, discard the event.
+            isSpuriousRouteEvent(event, checkpoint.records);
+            
             recordsToSubmit.add(event);
             addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), 
event.getEventType());
         }
@@ -776,6 +783,45 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         return false;
     }
 
+    
+    /**
+     * Checks if the given event is a spurious ROUTE, meaning that the ROUTE 
indicates that a FlowFile
+     * was routed to a relationship with only 1 connection and that Connection 
is the Connection from which
+     * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere.
+     * 
+     * @param event
+     * @param records
+     * @return
+     */
+    private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, 
final Map<FlowFileRecord, StandardRepositoryRecord> records) {
+        if ( event.getEventType() == ProvenanceEventType.ROUTE ) {
+            final String relationshipName = event.getRelationship();
+            final Relationship relationship = new 
Relationship.Builder().name(relationshipName).build();
+            final Collection<Connection> connectionsForRelationship = 
this.context.getConnections(relationship);
+            
+            // If the number of connections for this relationship is not 1, 
then we can't ignore this ROUTE event,
+            // as it may be cloning the FlowFile and adding to multiple 
connections.
+            if ( connectionsForRelationship.size() == 1 ) {
+                for ( final Map.Entry<FlowFileRecord, 
StandardRepositoryRecord> entry : records.entrySet() ) {
+                    final FlowFileRecord flowFileRecord = entry.getKey();
+                    if ( 
event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key()))
 ) {
+                        final StandardRepositoryRecord repoRecord = 
entry.getValue();
+                        if ( repoRecord.getOriginalQueue() == null ) {
+                            return false;
+                        }
+                        
+                        final String originalQueueId = 
repoRecord.getOriginalQueue().getIdentifier();
+                        final Connection destinationConnection = 
connectionsForRelationship.iterator().next();
+                        final String destinationQueueId = 
destinationConnection.getFlowFileQueue().getIdentifier();
+                        return originalQueueId.equals(destinationQueueId);
+                    }
+                }
+            }
+        }
+        
+        return false;
+    }
+    
     @Override
     public void rollback() {
         rollback(false);

Reply via email to