Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 8ed8d6989 -> cbea1f193


NIFI-72: Auto-generate CREATE, CONTENT_MODIFIED, ATTRIBUTES_MODIFIED events 
when appropriate


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6b0a5e8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6b0a5e8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6b0a5e8c

Branch: refs/heads/develop
Commit: 6b0a5e8cd75a03a13ca90b08204bccc2a45bee70
Parents: 8ed8d69
Author: Mark Payne <[email protected]>
Authored: Thu Dec 11 09:03:05 2014 -0500
Committer: Mark Payne <[email protected]>
Committed: Thu Dec 11 09:03:05 2014 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      |  83 +++++++-
 .../repository/StandardProvenanceReporter.java  |   4 +-
 .../repository/TestStandardProcessSession.java  | 211 +++++++++++++------
 3 files changed, 232 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b0a5e8c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 08e6afe..4ba45aa 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -75,7 +75,6 @@ import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -488,6 +487,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
     }
 
+    private void addEventType(final Map<String, Set<ProvenanceEventType>> map, 
final String id, final ProvenanceEventType eventType) {
+        Set<ProvenanceEventType> eventTypes = map.get(id);
+        if ( eventTypes == null ) {
+            eventTypes = new HashSet<>();
+            map.put(id, eventTypes);
+        }
+        
+        eventTypes.add(eventType);
+    }
+    
     private void updateProvenanceRepo(final Checkpoint checkpoint) {
         // Update Provenance Repository
         final ProvenanceEventRepository provenanceRepo = 
context.getProvenanceRepository();
@@ -496,7 +505,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         // in case the Processor developer submitted the same events to the 
reporter. So we use a LinkedHashSet
         // for this, so that we are able to ensure that the events are 
submitted in the proper order.
         final Set<ProvenanceEventRecord> recordsToSubmit = new 
LinkedHashSet<>();
-
+        final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = 
new HashMap<>();
+        
         final Set<ProvenanceEventRecord> processorGenerated = 
checkpoint.reportedEvents;
 
         // We first want to submit FORK events because if the Processor is 
going to create events against
@@ -513,6 +523,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
             if (!event.getChildUuids().isEmpty() && 
!isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && 
!processorGenerated.contains(event)) {
                 recordsToSubmit.add(event);
+                
+                for ( final String childUuid : event.getChildUuids() ) {
+                    addEventType(eventTypesPerFlowFileId, childUuid, 
event.getEventType());
+                }
+                for ( final String parentUuid : event.getParentUuids() ) {
+                    addEventType(eventTypesPerFlowFileId, parentUuid, 
event.getEventType());
+                }
             }
         }
 
@@ -523,6 +540,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
 
             recordsToSubmit.add(event);
+            addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), 
event.getEventType());
         }
 
         // Finally, add any other events that we may have generated.
@@ -533,6 +551,67 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 }
 
                 recordsToSubmit.add(event);
+                addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), 
event.getEventType());
+            }
+        }
+        
+        // Check if content or attributes changed. If so, register the 
appropriate events.
+        for (final StandardRepositoryRecord repoRecord : 
checkpoint.records.values() ) {
+            final ContentClaim original = repoRecord.getOriginalClaim();
+            final ContentClaim current = repoRecord.getCurrentClaim();
+            
+            boolean contentChanged = false;
+            if ( original == null && current != null ) {
+                contentChanged = true;
+            }
+            if ( original != null && current == null ) {
+                contentChanged = true;
+            }
+            if ( original != null && current != null && 
!original.equals(current) ) {
+                contentChanged = true;
+            }
+            
+            final FlowFileRecord curFlowFile = repoRecord.getCurrent();
+            final String flowFileId = 
curFlowFile.getAttribute(CoreAttributes.UUID.key());
+            boolean eventAdded = false;
+            
+            if (checkpoint.removedFlowFiles.contains(flowFileId)) {
+                continue;
+            }
+            
+            if ( contentChanged ) {
+                recordsToSubmit.add(provenanceReporter.build(curFlowFile, 
ProvenanceEventType.CONTENT_MODIFIED).build());
+                addEventType(eventTypesPerFlowFileId, flowFileId, 
ProvenanceEventType.CONTENT_MODIFIED);
+                eventAdded = true;
+            }
+            
+            if ( checkpoint.createdFlowFiles.contains(flowFileId) ) {
+                final Set<ProvenanceEventType> registeredTypes = 
eventTypesPerFlowFileId.get(flowFileId);
+                boolean creationEventRegistered = false;
+                if ( registeredTypes != null ) {
+                    if ( registeredTypes.contains(ProvenanceEventType.CREATE) 
||
+                            registeredTypes.contains(ProvenanceEventType.FORK) 
||
+                            registeredTypes.contains(ProvenanceEventType.JOIN) 
||
+                            
registeredTypes.contains(ProvenanceEventType.RECEIVE) ) {
+                        creationEventRegistered = true;
+                    }
+                }
+                
+                if ( !creationEventRegistered ) {
+                    recordsToSubmit.add(provenanceReporter.build(curFlowFile, 
ProvenanceEventType.CREATE).build());
+                    eventAdded = true;
+                }
+            }
+            
+            if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) 
{
+                // We generate an ATTRIBUTES_MODIFIED event only if no other 
event has been
+                // created for the FlowFile. We do this because all events 
contain both the
+                // newest and the original attributes, so generating an 
ATTRIBUTES_MODIFIED
+                // event is redundant if another already exists.
+                if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) {
+                    recordsToSubmit.add(provenanceReporter.build(curFlowFile, 
ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
+                    addEventType(eventTypesPerFlowFileId, flowFileId, 
ProvenanceEventType.ATTRIBUTES_MODIFIED);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b0a5e8c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index e8b1e87..01fb3dc 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -328,7 +328,7 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
             }
         }
     }
-
+    
     @Override
     public void modifyContent(final FlowFile flowFile) {
         modifyContent(flowFile, null, -1L);
@@ -421,7 +421,7 @@ public class StandardProvenanceReporter implements 
ProvenanceReporter {
         }
     }
 
-    private ProvenanceEventBuilder build(final FlowFile flowFile, final 
ProvenanceEventType eventType) {
+    ProvenanceEventBuilder build(final FlowFile flowFile, final 
ProvenanceEventType eventType) {
         final ProvenanceEventBuilder builder = repository.eventBuilder();
         builder.setEventType(eventType);
         builder.fromFlowFile(flowFile);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b0a5e8c/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 6e0a5d7..3dbbcf3 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.repository;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
@@ -64,7 +65,6 @@ import 
org.apache.nifi.provenance.MockProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -267,7 +267,7 @@ public class TestStandardProcessSession {
     }
 
     @Test
-    public void testSpawnsNotEmittedIfFilesDeleted() throws IOException {
+    public void testForksNotEmittedIfFilesDeleted() throws IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
@@ -283,8 +283,9 @@ public class TestStandardProcessSession {
         assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
     }
 
+    
     @Test
-    public void testProvenanceEventsEmittedForSpawnIfNotRemoved() throws 
IOException {
+    public void testProvenanceEventsEmittedForForkIfNotRemoved() throws 
IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
@@ -320,6 +321,79 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testUpdateAttributesThenJoin() throws IOException {
+        final FlowFileRecord flowFileRecord1 = new 
StandardFlowFileRecord.Builder()
+            .id(1L)
+            .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        
+        final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
+            .id(2L)
+            .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        
+        flowFileQueue.put(flowFileRecord1);
+        flowFileQueue.put(flowFileRecord2);
+        
+        FlowFile ff1 = session.get();
+        FlowFile ff2 = session.get();
+
+        ff1 = session.putAttribute(ff1, "index", "1");
+        ff2 = session.putAttribute(ff2, "index", "2");
+        
+        final List<FlowFile> parents = new ArrayList<>(2);
+        parents.add(ff1);
+        parents.add(ff2);
+        
+        final FlowFile child = session.create(parents);
+        
+        final Relationship rel = new Relationship.Builder().name("A").build();
+        
+        session.transfer(ff1, rel);
+        session.transfer(ff2, rel);
+        session.transfer(child, rel);
+        
+        session.commit();
+        
+        final List<ProvenanceEventRecord> events = 
provenanceRepo.getEvents(0L, 1000);
+
+        // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
+        assertEquals(3, events.size());
+        
+        int joinCount = 0;
+        int ff1UpdateCount = 0;
+        int ff2UpdateCount = 0;
+        
+        for ( final ProvenanceEventRecord event : events ) {
+            switch (event.getEventType()) {
+                case JOIN:
+                    assertEquals(child.getAttribute("uuid"), 
event.getFlowFileUuid());
+                    joinCount++;
+                    break;
+                case ATTRIBUTES_MODIFIED:
+                    if ( 
event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
+                        ff1UpdateCount++;
+                    } else if ( 
event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
+                        ff2UpdateCount++;
+                    } else {
+                        Assert.fail("Got ATTRIBUTE_MODIFIED for wrong 
FlowFile: " + event.getFlowFileUuid());
+                    }
+                    break;
+                default:
+                    Assert.fail("Unexpected event type: " + event);
+            }
+        }
+        
+        assertEquals(1, joinCount);
+        assertEquals(1, ff1UpdateCount);
+        assertEquals(1, ff2UpdateCount);
+        
+        assertEquals(1, joinCount);
+    }
+    
+    @Test
     public void testForkOneToOneReported() throws IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
@@ -628,34 +702,34 @@ public class TestStandardProcessSession {
     @Test
     public void 
testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                }).build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new ContentClaim() {
+                @Override
+                public int compareTo(ContentClaim arg0) {
+                    return 0;
+                }
+    
+                @Override
+                public String getId() {
+                    return "0";
+                }
+    
+                @Override
+                public String getContainer() {
+                    return "container";
+                }
+    
+                @Override
+                public String getSection() {
+                    return "section";
+                }
+    
+                @Override
+                public boolean isLossTolerant() {
+                    return true;
+                }
+            }).build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -668,37 +742,35 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .contentClaimOffset(1000L)
-                .size(1L)
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new ContentClaim() {
+                @Override
+                public int compareTo(ContentClaim arg0) {
+                    return 0;
+                }
+    
+                @Override
+                public String getId() {
+                    return "0";
+                }
+    
+                @Override
+                public String getContainer() {
+                    return "container";
+                }
+    
+                @Override
+                public String getSection() {
+                    return "section";
+                }
+    
+                @Override
+                public boolean isLossTolerant() {
+                    return true;
+                }
+            })
+            .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
@@ -759,6 +831,21 @@ public class TestStandardProcessSession {
         }
     }
 
+    
+    @Test
+    public void testCreateEmitted() throws IOException {
+        FlowFile newFlowFile = session.create();
+        session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
+        session.commit();
+        
+        final List<ProvenanceEventRecord> events = 
provenanceRepo.getEvents(0L, 10000);
+        assertFalse(events.isEmpty());
+        assertEquals(1, events.size());
+        
+        final ProvenanceEventRecord event = events.get(0);
+        assertEquals(ProvenanceEventType.CREATE, event.getEventType());
+    }
+    
     private static class MockFlowFileRepository implements FlowFileRepository {
 
         private final AtomicLong idGenerator = new AtomicLong(0L);

Reply via email to