Repository: nifi
Updated Branches:
  refs/heads/master 65d895827 -> ae9e2fdf0


http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9e2fdf/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 3238d97..5417669 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -19,6 +19,7 @@ package org.apache.nifi.provenance;
 import static org.apache.nifi.provenance.TestUtil.createFlowFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -39,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -51,6 +53,8 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.provenance.lineage.EventNode;
@@ -173,7 +177,7 @@ public class TestPersistentProvenanceRepository {
         config.setJournalCount(10);
         config.setQueryThreadPoolSize(10);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -221,7 +225,7 @@ public class TestPersistentProvenanceRepository {
         System.out.println("Closing and re-initializing");
         repo.close();
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
         System.out.println("Re-initialized");
 
         final long fetchStart = System.nanoTime();
@@ -241,7 +245,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileCapacity(1L);
         config.setMaxEventFileLife(1, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -267,7 +271,7 @@ public class TestPersistentProvenanceRepository {
         Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all 
file handles, etc.)
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
         final List<ProvenanceEventRecord> recoveredRecords = 
repo.getEvents(0L, 12);
 
         assertEquals(10, recoveredRecords.size());
@@ -290,7 +294,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(2, TimeUnit.SECONDS);
         config.setSearchableFields(searchableFields);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -331,7 +335,7 @@ public class TestPersistentProvenanceRepository {
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, 
"XXXX"));
         query.setMaxResults(100);
 
-        final QueryResult result = repo.queryEvents(query);
+        final QueryResult result = repo.queryEvents(query, createUser());
         assertEquals(2, result.getMatchingEvents().size());
         for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
             System.out.println(match);
@@ -344,7 +348,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -375,7 +379,7 @@ public class TestPersistentProvenanceRepository {
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
         query.setMaxResults(100);
 
-        final QueryResult result = repo.queryEvents(query);
+        final QueryResult result = repo.queryEvents(query, createUser());
         assertEquals(10, result.getMatchingEvents().size());
         for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
             System.out.println(match);
@@ -388,7 +392,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setCompressOnRollover(true);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -426,7 +430,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "10000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -458,7 +462,7 @@ public class TestPersistentProvenanceRepository {
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
         query.setMaxResults(100);
 
-        final QueryResult result = repo.queryEvents(query);
+        final QueryResult result = repo.queryEvents(query, createUser());
         assertEquals(10, result.getMatchingEvents().size());
         for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
             System.out.println(match);
@@ -471,7 +475,7 @@ public class TestPersistentProvenanceRepository {
         repo.purgeOldEvents();
         Thread.sleep(2000L);
 
-        final QueryResult newRecordSet = repo.queryEvents(query);
+        final QueryResult newRecordSet = repo.queryEvents(query, createUser());
         assertTrue(newRecordSet.getMatchingEvents().isEmpty());
     }
 
@@ -486,7 +490,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -525,7 +529,7 @@ public class TestPersistentProvenanceRepository {
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
         query.setMaxResults(100);
 
-        final QuerySubmission submission = repo.submitQuery(query);
+        final QuerySubmission submission = repo.submitQuery(query, 
createUser());
         while (!submission.getResult().isFinished()) {
             Thread.sleep(100L);
         }
@@ -555,7 +559,7 @@ public class TestPersistentProvenanceRepository {
         Thread.sleep(2000L); // purge is async. Give it time to do its job.
 
         query.setMaxResults(100);
-        final QuerySubmission noResultSubmission = repo.submitQuery(query);
+        final QuerySubmission noResultSubmission = repo.submitQuery(query, 
createUser());
         while (!noResultSubmission.getResult().isFinished()) {
             Thread.sleep(10L);
         }
@@ -573,7 +577,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -601,7 +605,7 @@ public class TestPersistentProvenanceRepository {
         final Query query = new Query(UUID.randomUUID().toString());
         query.setMaxResults(100);
 
-        final QueryResult result = repo.queryEvents(query);
+        final QueryResult result = repo.queryEvents(query, createUser());
         assertEquals(10, result.getMatchingEvents().size());
         for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
             System.out.println(match);
@@ -615,7 +619,7 @@ public class TestPersistentProvenanceRepository {
 
         Thread.sleep(1000L);
 
-        final QueryResult newRecordSet = repo.queryEvents(query);
+        final QueryResult newRecordSet = repo.queryEvents(query, createUser());
         assertTrue(newRecordSet.getMatchingEvents().isEmpty());
     }
 
@@ -629,7 +633,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000001";
         final Map<String, String> attributes = new HashMap<>();
@@ -655,7 +659,7 @@ public class TestPersistentProvenanceRepository {
 
         repo.waitForRollover();
 
-        final Lineage lineage = repo.computeLineage(uuid);
+        final Lineage lineage = repo.computeLineage(uuid, createUser());
         assertNotNull(lineage);
 
         // Nodes should consist of a RECEIVE followed by FlowFileNode, 
followed by a DROP
@@ -684,7 +688,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000001";
         final Map<String, String> attributes = new HashMap<>();
@@ -710,7 +714,7 @@ public class TestPersistentProvenanceRepository {
 
         repo.waitForRollover();
 
-        final AsyncLineageSubmission submission = 
repo.submitLineageComputation(uuid);
+        final AsyncLineageSubmission submission = 
repo.submitLineageComputation(uuid, createUser());
         while (!submission.getResult().isFinished()) {
             Thread.sleep(100L);
         }
@@ -743,7 +747,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String childId = "00000000-0000-0000-0000-000000000000";
 
@@ -773,7 +777,7 @@ public class TestPersistentProvenanceRepository {
 
         repo.waitForRollover();
 
-        final Lineage lineage = repo.computeLineage(childId);
+        final Lineage lineage = repo.computeLineage(childId, createUser());
         assertNotNull(lineage);
 
         // these are not necessarily accurate asserts....
@@ -793,7 +797,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String childId = "00000000-0000-0000-0000-000000000000";
 
@@ -823,7 +827,7 @@ public class TestPersistentProvenanceRepository {
 
         repo.waitForRollover();
 
-        final AsyncLineageSubmission submission = 
repo.submitLineageComputation(childId);
+        final AsyncLineageSubmission submission = 
repo.submitLineageComputation(childId, createUser());
         while (!submission.getResult().isFinished()) {
             Thread.sleep(100L);
         }
@@ -840,7 +844,7 @@ public class TestPersistentProvenanceRepository {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(1, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -866,13 +870,13 @@ public class TestPersistentProvenanceRepository {
         repo.close();
 
         final PersistentProvenanceRepository secondRepo = new 
PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        secondRepo.initialize(getEventReporter());
+        secondRepo.initialize(getEventReporter(), null, null);
 
         try {
             final ProvenanceEventRecord event11 = builder.build();
             secondRepo.registerEvent(event11);
             secondRepo.waitForRollover();
-            final ProvenanceEventRecord event11Retrieved = 
secondRepo.getEvent(10L);
+            final ProvenanceEventRecord event11Retrieved = 
secondRepo.getEvent(10L, null);
             assertNotNull(event11Retrieved);
             assertEquals(10, event11Retrieved.getEventId());
         } finally {
@@ -897,7 +901,7 @@ public class TestPersistentProvenanceRepository {
         in.writeInt(4);
         in.close();
         assertTrue(eventFile.exists());
-        final QueryResult result = repo.queryEvents(query);
+        final QueryResult result = repo.queryEvents(query, createUser());
         assertEquals(10, result.getMatchingEvents().size());
     }
 
@@ -914,7 +918,7 @@ public class TestPersistentProvenanceRepository {
         query.setMaxResults(100);
         DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new 
FileOutputStream(eventFile)));
         in.close();
-        final QueryResult result = repo.queryEvents(query);
+        final QueryResult result = repo.queryEvents(query, createUser());
         assertEquals(10, result.getMatchingEvents().size());
     }
 
@@ -927,7 +931,7 @@ public class TestPersistentProvenanceRepository {
         config.setDesiredIndexSize(10);
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         String uuid = UUID.randomUUID().toString();
         for (int i = 0; i < 20; i++) {
@@ -957,7 +961,7 @@ public class TestPersistentProvenanceRepository {
         config.setDesiredIndexSize(10); // force new index to be created for 
each rollover
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1003,7 +1007,7 @@ public class TestPersistentProvenanceRepository {
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
         query.setMaxResults(100);
 
-        final QueryResult result = repo.queryEvents(query);
+        final QueryResult result = repo.queryEvents(query, createUser());
         assertEquals(20, result.getMatchingEvents().size());
 
         // Ensure index directories exists
@@ -1022,7 +1026,7 @@ public class TestPersistentProvenanceRepository {
         repo.purgeOldEvents();
         Thread.sleep(2000L);
 
-        final QueryResult newRecordSet = repo.queryEvents(query);
+        final QueryResult newRecordSet = repo.queryEvents(query, createUser());
         assertEquals(10, newRecordSet.getMatchingEvents().size());
 
         // Ensure that one index directory is gone
@@ -1030,6 +1034,291 @@ public class TestPersistentProvenanceRepository {
         assertEquals(1, indexDirs.length);
     }
 
+    @Test
+    public void testNotAuthorizedGetSpecificEvent() throws IOException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(5, TimeUnit.MINUTES);
+        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
+        config.setDesiredIndexSize(10); // force new index to be created for 
each rollover
+
+        final AccessDeniedException expectedException = new 
AccessDeniedException("Unit Test - Intentionally Thrown");
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            protected void authorize(ProvenanceEventRecord event, NiFiUser 
user) {
+                throw expectedException;
+            }
+        };
+
+        repo.initialize(getEventReporter(), null, null);
+
+        final String uuid = "00000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 10; i++) {
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(10L); // make sure the events are destroyed 
when we call purge
+            repo.registerEvent(builder.build());
+        }
+
+        repo.waitForRollover();
+
+        try {
+            repo.getEvent(0L, null);
+            Assert.fail("getEvent() did not throw an Exception");
+        } catch (final Exception e) {
+            Assert.assertSame(expectedException, e);
+        }
+    }
+
+    @Test
+    public void testNotAuthorizedGetEventRange() throws IOException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(5, TimeUnit.MINUTES);
+        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
+        config.setDesiredIndexSize(10); // force new index to be created for 
each rollover
+
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser 
user) {
+                return event.getEventId() > 2;
+            }
+        };
+
+        repo.initialize(getEventReporter(), null, null);
+
+        final String uuid = "00000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 10; i++) {
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(10L); // make sure the events are destroyed 
when we call purge
+            repo.registerEvent(builder.build());
+        }
+
+        repo.waitForRollover();
+
+        final List<ProvenanceEventRecord> events = repo.getEvents(0L, 10, 
null);
+
+        // Ensure that we gets events with ID's 3 through 10.
+        assertEquals(7, events.size());
+        final List<Long> eventIds = events.stream().map(event -> 
event.getEventId()).sorted().collect(Collectors.toList());
+        for (int i = 0; i < 7; i++) {
+            Assert.assertEquals(i + 3, eventIds.get(i).intValue());
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testNotAuthorizedQuery() throws IOException, 
InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(5, TimeUnit.MINUTES);
+        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
+        config.setDesiredIndexSize(10); // force new index to be created for 
each rollover
+
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser 
user) {
+                return event.getEventId() > 2;
+            }
+        };
+
+        repo.initialize(getEventReporter(), null, null);
+
+        final String uuid = "00000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 10; i++) {
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(10L); // make sure the events are destroyed 
when we call purge
+            repo.registerEvent(builder.build());
+        }
+
+        repo.waitForRollover();
+
+        final Query query = new Query("1234");
+        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, 
"1234"));
+        final QuerySubmission submission = repo.submitQuery(query, 
createUser());
+
+        final QueryResult result = submission.getResult();
+        while (!result.isFinished()) {
+            Thread.sleep(100L);
+        }
+
+        // Ensure that we gets events with ID's 3 through 10.
+        final List<ProvenanceEventRecord> events = result.getMatchingEvents();
+        assertEquals(7, events.size());
+        final List<Long> eventIds = events.stream().map(event -> 
event.getEventId()).sorted().collect(Collectors.toList());
+        for (int i = 0; i < 7; i++) {
+            Assert.assertEquals(i + 3, eventIds.get(i).intValue());
+        }
+    }
+
+
+    @Test(timeout = 1000000)
+    public void testNotAuthorizedLineage() throws IOException, 
InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(5, TimeUnit.MINUTES);
+        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
+        config.setDesiredIndexSize(10); // force new index to be created for 
each rollover
+
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser 
user) {
+                return event.getEventType() != 
ProvenanceEventType.ATTRIBUTES_MODIFIED;
+            }
+        };
+
+        repo.initialize(getEventReporter(), null, null);
+
+        final String uuid = "00000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+        attributes.put("uuid", uuid);
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        builder.setEventTime(10L); // make sure the events are destroyed when 
we call purge
+
+        builder.fromFlowFile(createFlowFile(1, 3000L, attributes));
+        repo.registerEvent(builder.build());
+
+        builder.setEventType(ProvenanceEventType.CONTENT_MODIFIED);
+        builder.fromFlowFile(createFlowFile(2, 2000L, attributes));
+        repo.registerEvent(builder.build());
+
+        builder.setEventType(ProvenanceEventType.CONTENT_MODIFIED);
+        builder.fromFlowFile(createFlowFile(3, 2000L, attributes));
+        repo.registerEvent(builder.build());
+
+        builder.setEventType(ProvenanceEventType.ATTRIBUTES_MODIFIED);
+        attributes.put("new-attr", "yes");
+        builder.fromFlowFile(createFlowFile(4, 2000L, attributes));
+        repo.registerEvent(builder.build());
+
+        final Map<String, String> childAttributes = new HashMap<>(attributes);
+        childAttributes.put("uuid", "00000000-0000-0000-0000-000000000001");
+        builder.setEventType(ProvenanceEventType.FORK);
+        builder.fromFlowFile(createFlowFile(4, 2000L, attributes));
+        builder.addChildFlowFile(createFlowFile(5, 2000L, childAttributes));
+        builder.addParentFlowFile(createFlowFile(4, 2000L, attributes));
+        repo.registerEvent(builder.build());
+
+        builder.setEventType(ProvenanceEventType.ATTRIBUTES_MODIFIED);
+        builder.fromFlowFile(createFlowFile(6, 2000L, childAttributes));
+        repo.registerEvent(builder.build());
+
+        builder.setEventType(ProvenanceEventType.DROP);
+        builder.fromFlowFile(createFlowFile(6, 2000L, childAttributes));
+        repo.registerEvent(builder.build());
+
+        repo.waitForRollover();
+
+        final AsyncLineageSubmission originalLineage = 
repo.submitLineageComputation(uuid, createUser());
+
+        final StandardLineageResult result = originalLineage.getResult();
+        while (!result.isFinished()) {
+            Thread.sleep(100L);
+        }
+
+        final List<LineageNode> lineageNodes = result.getNodes();
+        assertEquals(6, lineageNodes.size());
+
+        assertEquals(1, lineageNodes.stream().map(node -> 
node.getNodeType()).filter(t -> t == LineageNodeType.FLOWFILE_NODE).count());
+        assertEquals(5, lineageNodes.stream().map(node -> 
node.getNodeType()).filter(t -> t == 
LineageNodeType.PROVENANCE_EVENT_NODE).count());
+
+        final Set<EventNode> eventNodes = lineageNodes.stream()
+            .filter(node -> node.getNodeType() == 
LineageNodeType.PROVENANCE_EVENT_NODE)
+            .map(node -> (EventNode) node)
+            .collect(Collectors.toSet());
+
+        final Map<ProvenanceEventType, List<EventNode>> nodesByType = 
eventNodes.stream().collect(Collectors.groupingBy(EventNode::getEventType));
+        assertEquals(1, nodesByType.get(ProvenanceEventType.RECEIVE).size());
+        assertEquals(2, 
nodesByType.get(ProvenanceEventType.CONTENT_MODIFIED).size());
+        assertEquals(1, nodesByType.get(ProvenanceEventType.FORK).size());
+
+        assertEquals(1, nodesByType.get(ProvenanceEventType.UNKNOWN).size());
+        assertNull(nodesByType.get(ProvenanceEventType.ATTRIBUTES_MODIFIED));
+
+        // Test filtering on expandChildren
+        final AsyncLineageSubmission expandChild = 
repo.submitExpandChildren(4L, createUser());
+        final StandardLineageResult expandChildResult = 
expandChild.getResult();
+        while (!expandChildResult.isFinished()) {
+            Thread.sleep(100L);
+        }
+
+        final List<LineageNode> expandChildNodes = 
expandChildResult.getNodes();
+        assertEquals(4, expandChildNodes.size());
+
+        assertEquals(1, expandChildNodes.stream().map(node -> 
node.getNodeType()).filter(t -> t == LineageNodeType.FLOWFILE_NODE).count());
+        assertEquals(3, expandChildNodes.stream().map(node -> 
node.getNodeType()).filter(t -> t == 
LineageNodeType.PROVENANCE_EVENT_NODE).count());
+
+        final Set<EventNode> childEventNodes = expandChildNodes.stream()
+            .filter(node -> node.getNodeType() == 
LineageNodeType.PROVENANCE_EVENT_NODE)
+            .map(node -> (EventNode) node)
+            .collect(Collectors.toSet());
+
+        final Map<ProvenanceEventType, List<EventNode>> childNodesByType = 
childEventNodes.stream().collect(Collectors.groupingBy(EventNode::getEventType));
+        assertEquals(1, childNodesByType.get(ProvenanceEventType.FORK).size());
+        assertEquals(1, childNodesByType.get(ProvenanceEventType.DROP).size());
+        assertEquals(1, 
childNodesByType.get(ProvenanceEventType.UNKNOWN).size());
+        
assertNull(childNodesByType.get(ProvenanceEventType.ATTRIBUTES_MODIFIED));
+    }
+
+
 
     @Test
     public void testBackPressure() throws IOException, InterruptedException {
@@ -1045,7 +1334,7 @@ public class TestPersistentProvenanceRepository {
                 return journalCountRef.get();
             }
         };
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final Map<String, String> attributes = new HashMap<>();
         final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
@@ -1103,7 +1392,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1170,7 +1459,7 @@ public class TestPersistentProvenanceRepository {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final Map<String, String> attributes = new HashMap<>();
 
@@ -1221,7 +1510,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxAttributeChars(50);
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
@@ -1240,7 +1529,7 @@ public class TestPersistentProvenanceRepository {
         repo.registerEvent(record);
         repo.waitForRollover();
 
-        final ProvenanceEventRecord retrieved = repo.getEvent(0L);
+        final ProvenanceEventRecord retrieved = repo.getEvent(0L, null);
         assertNotNull(retrieved);
         assertEquals("12345678-0000-0000-0000-012345678912", 
retrieved.getAttributes().get("uuid"));
         assertEquals("12345678901234567890123456789012345678901234567890", 
retrieved.getAttributes().get("75chars"));
@@ -1269,7 +1558,7 @@ public class TestPersistentProvenanceRepository {
         };
 
         // initialize with our event reporter
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         // create some events in the journal files.
         final Map<String, String> attributes = new HashMap<>();
@@ -1346,7 +1635,7 @@ public class TestPersistentProvenanceRepository {
                 return spiedWriters;
             }
         };
-        repo.initialize(getEventReporter());
+        repo.initialize(getEventReporter(), null, null);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
@@ -1407,7 +1696,7 @@ public class TestPersistentProvenanceRepository {
         };
 
         try {
-            recoveryRepo.initialize(getEventReporter());
+            recoveryRepo.initialize(getEventReporter(), null, null);
         } finally {
             recoveryRepo.close();
         }
@@ -1437,4 +1726,29 @@ public class TestPersistentProvenanceRepository {
             return severity;
         }
     }
+
+    private NiFiUser createUser() {
+        return new NiFiUser() {
+            @Override
+            public String getIdentity() {
+                return "unit-test";
+            }
+
+            @Override
+            public String getUserName() {
+                return "Unit Test";
+            }
+
+            @Override
+            public NiFiUser getChain() {
+                return null;
+            }
+
+            @Override
+            public boolean isAnonymous() {
+                return false;
+            }
+        };
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9e2fdf/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index f4f9d12..888f55a 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -37,6 +37,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
@@ -55,6 +62,7 @@ import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.Filter;
 import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.apache.nifi.util.RingBuffer.IterationDirection;
+import org.apache.nifi.web.ResourceNotFoundException;
 
 public class VolatileProvenanceRepository implements ProvenanceEventRepository 
{
 
@@ -75,6 +83,9 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     private final AtomicLong idGenerator = new AtomicLong(0L);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
 
+    private Authorizer authorizer;  // effectively final
+    private ProvenanceAuthorizableFactory resourceFactory;  // effectively 
final
+
     public VolatileProvenanceRepository() {
         final NiFiProperties properties = NiFiProperties.getInstance();
 
@@ -103,11 +114,14 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     }
 
     @Override
-    public void initialize(final EventReporter eventReporter) {
+    public void initialize(final EventReporter eventReporter, final Authorizer 
authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws 
IOException {
         if (initialized.getAndSet(true)) {
             return;
         }
 
+        this.authorizer = authorizer;
+        this.resourceFactory = resourceFactory;
+
         scheduledExecService.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30L, 30L, TimeUnit.SECONDS);
     }
 
@@ -131,9 +145,18 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
 
     @Override
     public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
+        return getEvents(firstRecordId, maxRecords, null);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxRecords, final NiFiUser user) throws IOException {
         return ringBuffer.getSelectedElements(new 
Filter<ProvenanceEventRecord>() {
             @Override
             public boolean select(final ProvenanceEventRecord value) {
+                if (user != null && !isAuthorized(value, user)) {
+                    return false;
+                }
+
                 return value.getEventId() >= firstRecordId;
             }
         }, maxRecords);
@@ -155,8 +178,7 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         return records.isEmpty() ? null : records.get(0);
     }
 
-    @Override
-    public ProvenanceEventRecord getEvent(final long id) {
+    private ProvenanceEventRecord getEvent(final long id) {
         final List<ProvenanceEventRecord> records = 
ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() {
             @Override
             public boolean select(final ProvenanceEventRecord event) {
@@ -168,6 +190,17 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     }
 
     @Override
+    public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) {
+        final ProvenanceEventRecord event = getEvent(id);
+        if (event == null) {
+            return null;
+        }
+
+        authorize(event, user);
+        return event;
+    }
+
+    @Override
     public void close() throws IOException {
         queryExecService.shutdownNow();
         scheduledExecService.shutdown();
@@ -183,8 +216,8 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         return searchableAttributes;
     }
 
-    public QueryResult queryEvents(final Query query) throws IOException {
-        final QuerySubmission submission = submitQuery(query);
+    public QueryResult queryEvents(final Query query, final NiFiUser user) 
throws IOException {
+        final QuerySubmission submission = submitQuery(query, user);
         final QueryResult result = submission.getResult();
         while (!result.isFinished()) {
             try {
@@ -200,10 +233,40 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         return result;
     }
 
-    private Filter<ProvenanceEventRecord> createFilter(final Query query) {
+
+    public boolean isAuthorized(final ProvenanceEventRecord event, final 
NiFiUser user) {
+        if (authorizer == null) {
+            return true;
+        }
+
+        final Authorizable eventAuthorizable;
+        try {
+            eventAuthorizable = 
resourceFactory.createProvenanceAuthorizable(event.getComponentId());
+        } catch (final ResourceNotFoundException rnfe) {
+            return false;
+        }
+
+        final AuthorizationResult result = 
eventAuthorizable.checkAuthorization(authorizer, RequestAction.READ, user);
+        return Result.Approved.equals(result.getResult());
+    }
+
+    protected void authorize(final ProvenanceEventRecord event, final NiFiUser 
user) {
+        if (authorizer == null) {
+            return;
+        }
+
+        final Authorizable eventAuthorizable = 
resourceFactory.createProvenanceAuthorizable(event.getComponentId());
+        eventAuthorizable.authorize(authorizer, RequestAction.READ, user);
+    }
+
+    private Filter<ProvenanceEventRecord> createFilter(final Query query, 
final NiFiUser user) {
         return new Filter<ProvenanceEventRecord>() {
             @Override
             public boolean select(final ProvenanceEventRecord event) {
+                if (!isAuthorized(event, user)) {
+                    return false;
+                }
+
                 if (query.getStartDate() != null && 
query.getStartDate().getTime() > event.getEventTime()) {
                     return false;
                 }
@@ -348,36 +411,51 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     }
 
     @Override
-    public QuerySubmission submitQuery(final Query query) {
+    public QuerySubmission submitQuery(final Query query, final NiFiUser user) 
{
         if (query.getEndDate() != null && query.getStartDate() != null && 
query.getStartDate().getTime() > query.getEndDate().getTime()) {
             throw new IllegalArgumentException("Query End Time cannot be 
before Query Start Time");
         }
 
         if (query.getSearchTerms().isEmpty() && query.getStartDate() == null 
&& query.getEndDate() == null) {
-            final AsyncQuerySubmission result = new 
AsyncQuerySubmission(query, 1);
-            queryExecService.submit(new QueryRunnable(ringBuffer, 
createFilter(query), query.getMaxResults(), result));
+            final AsyncQuerySubmission result = new 
AsyncQuerySubmission(query, 1, user.getIdentity());
+            queryExecService.submit(new QueryRunnable(ringBuffer, 
createFilter(query, user), query.getMaxResults(), result));
             querySubmissionMap.put(query.getIdentifier(), result);
             return result;
         }
 
-        final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1);
+        final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1, 
user.getIdentity());
         querySubmissionMap.put(query.getIdentifier(), result);
-        queryExecService.submit(new QueryRunnable(ringBuffer, 
createFilter(query), query.getMaxResults(), result));
+        queryExecService.submit(new QueryRunnable(ringBuffer, 
createFilter(query, user), query.getMaxResults(), result));
 
         return result;
     }
 
     @Override
-    public QuerySubmission retrieveQuerySubmission(final String 
queryIdentifier) {
-        return querySubmissionMap.get(queryIdentifier);
+    public QuerySubmission retrieveQuerySubmission(final String 
queryIdentifier, final NiFiUser user) {
+        final QuerySubmission submission = 
querySubmissionMap.get(queryIdentifier);
+        final String userId = submission.getSubmitterIdentity();
+
+        if (user == null && userId == null) {
+            return submission;
+        }
+
+        if (user == null) {
+            throw new AccessDeniedException("Cannot retrieve Provenance Query 
Submission because no user id was provided");
+        }
+
+        if (userId == null || userId.equals(user.getIdentity())) {
+            return submission;
+        }
+
+        throw new AccessDeniedException("Cannot retrieve Provenance Query 
Submission because " + user.getIdentity() + " is not the user who submitted the 
request");
     }
 
-    public Lineage computeLineage(final String flowFileUUID) throws 
IOException {
-        return computeLineage(Collections.<String>singleton(flowFileUUID), 
LineageComputationType.FLOWFILE_LINEAGE, null);
+    public Lineage computeLineage(final String flowFileUUID, final NiFiUser 
user) throws IOException {
+        return computeLineage(Collections.<String> singleton(flowFileUUID), 
user, LineageComputationType.FLOWFILE_LINEAGE, null);
     }
 
-    private Lineage computeLineage(final Collection<String> flowFileUuids, 
final LineageComputationType computationType, final Long eventId) throws 
IOException {
-        final AsyncLineageSubmission submission = 
submitLineageComputation(flowFileUuids, computationType, eventId);
+    private Lineage computeLineage(final Collection<String> flowFileUuids, 
final NiFiUser user, final LineageComputationType computationType, final Long 
eventId) throws IOException {
+        final AsyncLineageSubmission submission = 
submitLineageComputation(flowFileUuids, user, computationType, eventId);
         final StandardLineageResult result = submission.getResult();
         while (!result.isFinished()) {
             try {
@@ -394,13 +472,28 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     }
 
     @Override
-    public AsyncLineageSubmission submitLineageComputation(final String 
flowFileUuid) {
-        return submitLineageComputation(Collections.singleton(flowFileUuid), 
LineageComputationType.FLOWFILE_LINEAGE, null);
+    public AsyncLineageSubmission submitLineageComputation(final String 
flowFileUuid, final NiFiUser user) {
+        return submitLineageComputation(Collections.singleton(flowFileUuid), 
user, LineageComputationType.FLOWFILE_LINEAGE, null);
     }
 
     @Override
-    public ComputeLineageSubmission retrieveLineageSubmission(String 
lineageIdentifier) {
-        return lineageSubmissionMap.get(lineageIdentifier);
+    public ComputeLineageSubmission retrieveLineageSubmission(String 
lineageIdentifier, final NiFiUser user) {
+        final ComputeLineageSubmission submission = 
lineageSubmissionMap.get(lineageIdentifier);
+        final String userId = submission.getSubmitterIdentity();
+
+        if (user == null && userId == null) {
+            return submission;
+        }
+
+        if (user == null) {
+            throw new AccessDeniedException("Cannot retrieve Provenance Query 
Submission because no user id was provided");
+        }
+
+        if (userId == null || userId.equals(user.getIdentity())) {
+            return submission;
+        }
+
+        throw new AccessDeniedException("Cannot retrieve Provenance Query 
Submission because " + user.getIdentity() + " is not the user who submitted the 
request");
     }
 
     public Lineage expandSpawnEventParents(String identifier) throws 
IOException {
@@ -408,10 +501,12 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     }
 
     @Override
-    public ComputeLineageSubmission submitExpandParents(final long eventId) {
-        final ProvenanceEventRecord event = getEvent(eventId);
+    public ComputeLineageSubmission submitExpandParents(final long eventId, 
final NiFiUser user) {
+        final String userId = user.getIdentity();
+
+        final ProvenanceEventRecord event = getEvent(eventId, user);
         if (event == null) {
-            final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String>emptyList(), 1);
+            final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String> emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
             
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
             return submission;
@@ -422,9 +517,9 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
             case FORK:
             case REPLAY:
             case CLONE:
-                return submitLineageComputation(event.getParentUuids(), 
LineageComputationType.EXPAND_PARENTS, eventId);
+                return submitLineageComputation(event.getParentUuids(), user, 
LineageComputationType.EXPAND_PARENTS, eventId);
             default: {
-                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String>emptyList(), 1);
+                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String> emptyList(), 1, userId);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
                 submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its parents cannot 
be expanded");
                 return submission;
@@ -437,10 +532,12 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
     }
 
     @Override
-    public ComputeLineageSubmission submitExpandChildren(final long eventId) {
-        final ProvenanceEventRecord event = getEvent(eventId);
+    public ComputeLineageSubmission submitExpandChildren(final long eventId, 
final NiFiUser user) {
+        final String userId = user.getIdentity();
+
+        final ProvenanceEventRecord event = getEvent(eventId, user);
         if (event == null) {
-            final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1);
+            final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String> emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
             
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
             return submission;
@@ -451,9 +548,9 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
             case FORK:
             case REPLAY:
             case CLONE:
-                return submitLineageComputation(event.getChildUuids(), 
LineageComputationType.EXPAND_CHILDREN, eventId);
+                return submitLineageComputation(event.getChildUuids(), user, 
LineageComputationType.EXPAND_CHILDREN, eventId);
             default: {
-                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1);
+                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String> emptyList(), 1, userId);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
                 submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its children cannot 
be expanded");
                 return submission;
@@ -461,13 +558,18 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         }
     }
 
-    private AsyncLineageSubmission submitLineageComputation(final 
Collection<String> flowFileUuids, final LineageComputationType computationType, 
final Long eventId) {
-        final AsyncLineageSubmission result = new 
AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1);
+    private AsyncLineageSubmission submitLineageComputation(final 
Collection<String> flowFileUuids, final NiFiUser user, final 
LineageComputationType computationType, final Long eventId) {
+        final String userId = user.getIdentity();
+        final AsyncLineageSubmission result = new 
AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1, userId);
         lineageSubmissionMap.put(result.getLineageIdentifier(), result);
 
         final Filter<ProvenanceEventRecord> filter = new 
Filter<ProvenanceEventRecord>() {
             @Override
             public boolean select(final ProvenanceEventRecord event) {
+                if (user != null && !isAuthorized(event, user)) {
+                    return false;
+                }
+
                 if (flowFileUuids.contains(event.getFlowFileUuid())) {
                     return true;
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9e2fdf/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
index 3c3e401..d35ceac 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QuerySubmission;
@@ -70,7 +71,7 @@ public class TestVolatileProvenanceRepository {
         assertEquals(10, retrieved.size());
         for (int i = 0; i < 10; i++) {
             final ProvenanceEventRecord recovered = retrieved.get(i);
-            assertEquals((long) i, recovered.getEventId());
+            assertEquals(i, recovered.getEventId());
             assertEquals("nifi://unit-test", recovered.getTransitUri());
             assertEquals(ProvenanceEventType.RECEIVE, 
recovered.getEventType());
             assertEquals(attributes, recovered.getAttributes());
@@ -108,7 +109,7 @@ public class TestVolatileProvenanceRepository {
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
         query.setMaxResults(100);
 
-        final QuerySubmission submission = repo.submitQuery(query);
+        final QuerySubmission submission = repo.submitQuery(query, 
createUser());
         while (!submission.getResult().isFinished()) {
             Thread.sleep(100L);
         }
@@ -175,4 +176,27 @@ public class TestVolatileProvenanceRepository {
         };
     }
 
+    private NiFiUser createUser() {
+        return new NiFiUser() {
+            @Override
+            public String getIdentity() {
+                return "unit-test";
+            }
+
+            @Override
+            public String getUserName() {
+                return "Unit Test";
+            }
+
+            @Override
+            public NiFiUser getChain() {
+                return null;
+            }
+
+            @Override
+            public boolean isAnonymous() {
+                return false;
+            }
+        };
+    }
 }

Reply via email to