Repository: nifi Updated Branches: refs/heads/master 65d895827 -> ae9e2fdf0
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9e2fdf/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 3238d97..5417669 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -19,6 +19,7 @@ package org.apache.nifi.provenance; import static org.apache.nifi.provenance.TestUtil.createFlowFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -39,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; import org.apache.lucene.analysis.Analyzer; @@ -51,6 +53,8 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.provenance.lineage.EventNode; @@ -173,7 +177,7 @@ public class TestPersistentProvenanceRepository { config.setJournalCount(10); config.setQueryThreadPoolSize(10); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -221,7 +225,7 @@ public class TestPersistentProvenanceRepository { System.out.println("Closing and re-initializing"); repo.close(); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); System.out.println("Re-initialized"); final long fetchStart = System.nanoTime(); @@ -241,7 +245,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileCapacity(1L); config.setMaxEventFileLife(1, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -267,7 +271,7 @@ public class TestPersistentProvenanceRepository { Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.) repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, 12); assertEquals(10, recoveredRecords.size()); @@ -290,7 +294,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(2, TimeUnit.SECONDS); config.setSearchableFields(searchableFields); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -331,7 +335,7 @@ public class TestPersistentProvenanceRepository { query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "XXXX")); query.setMaxResults(100); - final QueryResult result = repo.queryEvents(query); + final QueryResult result = repo.queryEvents(query, createUser()); assertEquals(2, result.getMatchingEvents().size()); for (final ProvenanceEventRecord match : result.getMatchingEvents()) { System.out.println(match); @@ -344,7 +348,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -375,7 +379,7 @@ public class TestPersistentProvenanceRepository { query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); query.setMaxResults(100); - final QueryResult result = repo.queryEvents(query); + final QueryResult result = repo.queryEvents(query, createUser()); assertEquals(10, result.getMatchingEvents().size()); for (final ProvenanceEventRecord match : result.getMatchingEvents()) { System.out.println(match); @@ -388,7 +392,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setCompressOnRollover(true); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -426,7 +430,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "10000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -458,7 +462,7 @@ public class TestPersistentProvenanceRepository { query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); query.setMaxResults(100); - final QueryResult result = repo.queryEvents(query); + final QueryResult result = repo.queryEvents(query, createUser()); assertEquals(10, result.getMatchingEvents().size()); for (final ProvenanceEventRecord match : result.getMatchingEvents()) { System.out.println(match); @@ -471,7 +475,7 @@ public class TestPersistentProvenanceRepository { repo.purgeOldEvents(); Thread.sleep(2000L); - final QueryResult newRecordSet = repo.queryEvents(query); + final QueryResult newRecordSet = repo.queryEvents(query, createUser()); assertTrue(newRecordSet.getMatchingEvents().isEmpty()); } @@ -486,7 +490,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -525,7 +529,7 @@ public class TestPersistentProvenanceRepository { query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); query.setMaxResults(100); - final QuerySubmission submission = repo.submitQuery(query); + final QuerySubmission submission = repo.submitQuery(query, createUser()); while (!submission.getResult().isFinished()) { Thread.sleep(100L); } @@ -555,7 +559,7 @@ public class TestPersistentProvenanceRepository { Thread.sleep(2000L); // purge is async. Give it time to do its job. query.setMaxResults(100); - final QuerySubmission noResultSubmission = repo.submitQuery(query); + final QuerySubmission noResultSubmission = repo.submitQuery(query, createUser()); while (!noResultSubmission.getResult().isFinished()) { Thread.sleep(10L); } @@ -573,7 +577,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -601,7 +605,7 @@ public class TestPersistentProvenanceRepository { final Query query = new Query(UUID.randomUUID().toString()); query.setMaxResults(100); - final QueryResult result = repo.queryEvents(query); + final QueryResult result = repo.queryEvents(query, createUser()); assertEquals(10, result.getMatchingEvents().size()); for (final ProvenanceEventRecord match : result.getMatchingEvents()) { System.out.println(match); @@ -615,7 +619,7 @@ public class TestPersistentProvenanceRepository { Thread.sleep(1000L); - final QueryResult newRecordSet = repo.queryEvents(query); + final QueryResult newRecordSet = repo.queryEvents(query, createUser()); assertTrue(newRecordSet.getMatchingEvents().isEmpty()); } @@ -629,7 +633,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000001"; final Map<String, String> attributes = new HashMap<>(); @@ -655,7 +659,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); - final Lineage lineage = repo.computeLineage(uuid); + final Lineage lineage = repo.computeLineage(uuid, createUser()); assertNotNull(lineage); // Nodes should consist of a RECEIVE followed by FlowFileNode, followed by a DROP @@ -684,7 +688,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000001"; final Map<String, String> attributes = new HashMap<>(); @@ -710,7 +714,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); - final AsyncLineageSubmission submission = repo.submitLineageComputation(uuid); + final AsyncLineageSubmission submission = repo.submitLineageComputation(uuid, createUser()); while (!submission.getResult().isFinished()) { Thread.sleep(100L); } @@ -743,7 +747,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String childId = "00000000-0000-0000-0000-000000000000"; @@ -773,7 +777,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); - final Lineage lineage = repo.computeLineage(childId); + final Lineage lineage = repo.computeLineage(childId, createUser()); assertNotNull(lineage); // these are not necessarily accurate asserts.... @@ -793,7 +797,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String childId = "00000000-0000-0000-0000-000000000000"; @@ -823,7 +827,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); - final AsyncLineageSubmission submission = repo.submitLineageComputation(childId); + final AsyncLineageSubmission submission = repo.submitLineageComputation(childId, createUser()); while (!submission.getResult().isFinished()) { Thread.sleep(100L); } @@ -840,7 +844,7 @@ public class TestPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(1, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -866,13 +870,13 @@ public class TestPersistentProvenanceRepository { repo.close(); final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - secondRepo.initialize(getEventReporter()); + secondRepo.initialize(getEventReporter(), null, null); try { final ProvenanceEventRecord event11 = builder.build(); secondRepo.registerEvent(event11); secondRepo.waitForRollover(); - final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L); + final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L, null); assertNotNull(event11Retrieved); assertEquals(10, event11Retrieved.getEventId()); } finally { @@ -897,7 +901,7 @@ public class TestPersistentProvenanceRepository { in.writeInt(4); in.close(); assertTrue(eventFile.exists()); - final QueryResult result = repo.queryEvents(query); + final QueryResult result = repo.queryEvents(query, createUser()); assertEquals(10, result.getMatchingEvents().size()); } @@ -914,7 +918,7 @@ public class TestPersistentProvenanceRepository { query.setMaxResults(100); DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile))); in.close(); - final QueryResult result = repo.queryEvents(query); + final QueryResult result = repo.queryEvents(query, createUser()); assertEquals(10, result.getMatchingEvents().size()); } @@ -927,7 +931,7 @@ public class TestPersistentProvenanceRepository { config.setDesiredIndexSize(10); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); String uuid = UUID.randomUUID().toString(); for (int i = 0; i < 20; i++) { @@ -957,7 +961,7 @@ public class TestPersistentProvenanceRepository { config.setDesiredIndexSize(10); // force new index to be created for each rollover repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1003,7 +1007,7 @@ public class TestPersistentProvenanceRepository { query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); query.setMaxResults(100); - final QueryResult result = repo.queryEvents(query); + final QueryResult result = repo.queryEvents(query, createUser()); assertEquals(20, result.getMatchingEvents().size()); // Ensure index directories exists @@ -1022,7 +1026,7 @@ public class TestPersistentProvenanceRepository { repo.purgeOldEvents(); Thread.sleep(2000L); - final QueryResult newRecordSet = repo.queryEvents(query); + final QueryResult newRecordSet = repo.queryEvents(query, createUser()); assertEquals(10, newRecordSet.getMatchingEvents().size()); // Ensure that one index directory is gone @@ -1030,6 +1034,291 @@ public class TestPersistentProvenanceRepository { assertEquals(1, indexDirs.length); } + @Test + public void testNotAuthorizedGetSpecificEvent() throws IOException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(5, TimeUnit.MINUTES); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(10); // force new index to be created for each rollover + + final AccessDeniedException expectedException = new AccessDeniedException("Unit Test - Intentionally Thrown"); + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected void authorize(ProvenanceEventRecord event, NiFiUser user) { + throw expectedException; + } + }; + + repo.initialize(getEventReporter(), null, null); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(10L); // make sure the events are destroyed when we call purge + repo.registerEvent(builder.build()); + } + + repo.waitForRollover(); + + try { + repo.getEvent(0L, null); + Assert.fail("getEvent() did not throw an Exception"); + } catch (final Exception e) { + Assert.assertSame(expectedException, e); + } + } + + @Test + public void testNotAuthorizedGetEventRange() throws IOException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(5, TimeUnit.MINUTES); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(10); // force new index to be created for each rollover + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser user) { + return event.getEventId() > 2; + } + }; + + repo.initialize(getEventReporter(), null, null); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(10L); // make sure the events are destroyed when we call purge + repo.registerEvent(builder.build()); + } + + repo.waitForRollover(); + + final List<ProvenanceEventRecord> events = repo.getEvents(0L, 10, null); + + // Ensure that we gets events with ID's 3 through 10. + assertEquals(7, events.size()); + final List<Long> eventIds = events.stream().map(event -> event.getEventId()).sorted().collect(Collectors.toList()); + for (int i = 0; i < 7; i++) { + Assert.assertEquals(i + 3, eventIds.get(i).intValue()); + } + } + + @Test(timeout = 10000) + public void testNotAuthorizedQuery() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(5, TimeUnit.MINUTES); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(10); // force new index to be created for each rollover + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser user) { + return event.getEventId() > 2; + } + }; + + repo.initialize(getEventReporter(), null, null); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + builder.setEventTime(10L); // make sure the events are destroyed when we call purge + repo.registerEvent(builder.build()); + } + + repo.waitForRollover(); + + final Query query = new Query("1234"); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234")); + final QuerySubmission submission = repo.submitQuery(query, createUser()); + + final QueryResult result = submission.getResult(); + while (!result.isFinished()) { + Thread.sleep(100L); + } + + // Ensure that we gets events with ID's 3 through 10. + final List<ProvenanceEventRecord> events = result.getMatchingEvents(); + assertEquals(7, events.size()); + final List<Long> eventIds = events.stream().map(event -> event.getEventId()).sorted().collect(Collectors.toList()); + for (int i = 0; i < 7; i++) { + Assert.assertEquals(i + 3, eventIds.get(i).intValue()); + } + } + + + @Test(timeout = 1000000) + public void testNotAuthorizedLineage() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxRecordLife(5, TimeUnit.MINUTES); + config.setMaxStorageCapacity(1024L * 1024L); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setMaxEventFileCapacity(1024L * 1024L); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setDesiredIndexSize(10); // force new index to be created for each rollover + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser user) { + return event.getEventType() != ProvenanceEventType.ATTRIBUTES_MODIFIED; + } + }; + + repo.initialize(getEventReporter(), null, null); + + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + attributes.put("uuid", uuid); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + builder.setEventTime(10L); // make sure the events are destroyed when we call purge + + builder.fromFlowFile(createFlowFile(1, 3000L, attributes)); + repo.registerEvent(builder.build()); + + builder.setEventType(ProvenanceEventType.CONTENT_MODIFIED); + builder.fromFlowFile(createFlowFile(2, 2000L, attributes)); + repo.registerEvent(builder.build()); + + builder.setEventType(ProvenanceEventType.CONTENT_MODIFIED); + builder.fromFlowFile(createFlowFile(3, 2000L, attributes)); + repo.registerEvent(builder.build()); + + builder.setEventType(ProvenanceEventType.ATTRIBUTES_MODIFIED); + attributes.put("new-attr", "yes"); + builder.fromFlowFile(createFlowFile(4, 2000L, attributes)); + repo.registerEvent(builder.build()); + + final Map<String, String> childAttributes = new HashMap<>(attributes); + childAttributes.put("uuid", "00000000-0000-0000-0000-000000000001"); + builder.setEventType(ProvenanceEventType.FORK); + builder.fromFlowFile(createFlowFile(4, 2000L, attributes)); + builder.addChildFlowFile(createFlowFile(5, 2000L, childAttributes)); + builder.addParentFlowFile(createFlowFile(4, 2000L, attributes)); + repo.registerEvent(builder.build()); + + builder.setEventType(ProvenanceEventType.ATTRIBUTES_MODIFIED); + builder.fromFlowFile(createFlowFile(6, 2000L, childAttributes)); + repo.registerEvent(builder.build()); + + builder.setEventType(ProvenanceEventType.DROP); + builder.fromFlowFile(createFlowFile(6, 2000L, childAttributes)); + repo.registerEvent(builder.build()); + + repo.waitForRollover(); + + final AsyncLineageSubmission originalLineage = repo.submitLineageComputation(uuid, createUser()); + + final StandardLineageResult result = originalLineage.getResult(); + while (!result.isFinished()) { + Thread.sleep(100L); + } + + final List<LineageNode> lineageNodes = result.getNodes(); + assertEquals(6, lineageNodes.size()); + + assertEquals(1, lineageNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.FLOWFILE_NODE).count()); + assertEquals(5, lineageNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.PROVENANCE_EVENT_NODE).count()); + + final Set<EventNode> eventNodes = lineageNodes.stream() + .filter(node -> node.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE) + .map(node -> (EventNode) node) + .collect(Collectors.toSet()); + + final Map<ProvenanceEventType, List<EventNode>> nodesByType = eventNodes.stream().collect(Collectors.groupingBy(EventNode::getEventType)); + assertEquals(1, nodesByType.get(ProvenanceEventType.RECEIVE).size()); + assertEquals(2, nodesByType.get(ProvenanceEventType.CONTENT_MODIFIED).size()); + assertEquals(1, nodesByType.get(ProvenanceEventType.FORK).size()); + + assertEquals(1, nodesByType.get(ProvenanceEventType.UNKNOWN).size()); + assertNull(nodesByType.get(ProvenanceEventType.ATTRIBUTES_MODIFIED)); + + // Test filtering on expandChildren + final AsyncLineageSubmission expandChild = repo.submitExpandChildren(4L, createUser()); + final StandardLineageResult expandChildResult = expandChild.getResult(); + while (!expandChildResult.isFinished()) { + Thread.sleep(100L); + } + + final List<LineageNode> expandChildNodes = expandChildResult.getNodes(); + assertEquals(4, expandChildNodes.size()); + + assertEquals(1, expandChildNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.FLOWFILE_NODE).count()); + assertEquals(3, expandChildNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.PROVENANCE_EVENT_NODE).count()); + + final Set<EventNode> childEventNodes = expandChildNodes.stream() + .filter(node -> node.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE) + .map(node -> (EventNode) node) + .collect(Collectors.toSet()); + + final Map<ProvenanceEventType, List<EventNode>> childNodesByType = childEventNodes.stream().collect(Collectors.groupingBy(EventNode::getEventType)); + assertEquals(1, childNodesByType.get(ProvenanceEventType.FORK).size()); + assertEquals(1, childNodesByType.get(ProvenanceEventType.DROP).size()); + assertEquals(1, childNodesByType.get(ProvenanceEventType.UNKNOWN).size()); + assertNull(childNodesByType.get(ProvenanceEventType.ATTRIBUTES_MODIFIED)); + } + + @Test public void testBackPressure() throws IOException, InterruptedException { @@ -1045,7 +1334,7 @@ public class TestPersistentProvenanceRepository { return journalCountRef.get(); } }; - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); @@ -1103,7 +1392,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1170,7 +1459,7 @@ public class TestPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(3, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); @@ -1221,7 +1510,7 @@ public class TestPersistentProvenanceRepository { config.setMaxAttributeChars(50); config.setMaxEventFileLife(3, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); @@ -1240,7 +1529,7 @@ public class TestPersistentProvenanceRepository { repo.registerEvent(record); repo.waitForRollover(); - final ProvenanceEventRecord retrieved = repo.getEvent(0L); + final ProvenanceEventRecord retrieved = repo.getEvent(0L, null); assertNotNull(retrieved); assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid")); assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars")); @@ -1269,7 +1558,7 @@ public class TestPersistentProvenanceRepository { }; // initialize with our event reporter - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); // create some events in the journal files. final Map<String, String> attributes = new HashMap<>(); @@ -1346,7 +1635,7 @@ public class TestPersistentProvenanceRepository { return spiedWriters; } }; - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); @@ -1407,7 +1696,7 @@ public class TestPersistentProvenanceRepository { }; try { - recoveryRepo.initialize(getEventReporter()); + recoveryRepo.initialize(getEventReporter(), null, null); } finally { recoveryRepo.close(); } @@ -1437,4 +1726,29 @@ public class TestPersistentProvenanceRepository { return severity; } } + + private NiFiUser createUser() { + return new NiFiUser() { + @Override + public String getIdentity() { + return "unit-test"; + } + + @Override + public String getUserName() { + return "Unit Test"; + } + + @Override + public NiFiUser getChain() { + return null; + } + + @Override + public boolean isAnonymous() { + return false; + } + }; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9e2fdf/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index f4f9d12..888f55a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -37,6 +37,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; @@ -55,6 +62,7 @@ import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer.Filter; import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.RingBuffer.IterationDirection; +import org.apache.nifi.web.ResourceNotFoundException; public class VolatileProvenanceRepository implements ProvenanceEventRepository { @@ -75,6 +83,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { private final AtomicLong idGenerator = new AtomicLong(0L); private final AtomicBoolean initialized = new AtomicBoolean(false); + private Authorizer authorizer; // effectively final + private ProvenanceAuthorizableFactory resourceFactory; // effectively final + public VolatileProvenanceRepository() { final NiFiProperties properties = NiFiProperties.getInstance(); @@ -103,11 +114,14 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } @Override - public void initialize(final EventReporter eventReporter) { + public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws IOException { if (initialized.getAndSet(true)) { return; } + this.authorizer = authorizer; + this.resourceFactory = resourceFactory; + scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 30L, TimeUnit.SECONDS); } @@ -131,9 +145,18 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { @Override public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException { + return getEvents(firstRecordId, maxRecords, null); + } + + @Override + public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException { return ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() { @Override public boolean select(final ProvenanceEventRecord value) { + if (user != null && !isAuthorized(value, user)) { + return false; + } + return value.getEventId() >= firstRecordId; } }, maxRecords); @@ -155,8 +178,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { return records.isEmpty() ? null : records.get(0); } - @Override - public ProvenanceEventRecord getEvent(final long id) { + private ProvenanceEventRecord getEvent(final long id) { final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() { @Override public boolean select(final ProvenanceEventRecord event) { @@ -168,6 +190,17 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } @Override + public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) { + final ProvenanceEventRecord event = getEvent(id); + if (event == null) { + return null; + } + + authorize(event, user); + return event; + } + + @Override public void close() throws IOException { queryExecService.shutdownNow(); scheduledExecService.shutdown(); @@ -183,8 +216,8 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { return searchableAttributes; } - public QueryResult queryEvents(final Query query) throws IOException { - final QuerySubmission submission = submitQuery(query); + public QueryResult queryEvents(final Query query, final NiFiUser user) throws IOException { + final QuerySubmission submission = submitQuery(query, user); final QueryResult result = submission.getResult(); while (!result.isFinished()) { try { @@ -200,10 +233,40 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { return result; } - private Filter<ProvenanceEventRecord> createFilter(final Query query) { + + public boolean isAuthorized(final ProvenanceEventRecord event, final NiFiUser user) { + if (authorizer == null) { + return true; + } + + final Authorizable eventAuthorizable; + try { + eventAuthorizable = resourceFactory.createProvenanceAuthorizable(event.getComponentId()); + } catch (final ResourceNotFoundException rnfe) { + return false; + } + + final AuthorizationResult result = eventAuthorizable.checkAuthorization(authorizer, RequestAction.READ, user); + return Result.Approved.equals(result.getResult()); + } + + protected void authorize(final ProvenanceEventRecord event, final NiFiUser user) { + if (authorizer == null) { + return; + } + + final Authorizable eventAuthorizable = resourceFactory.createProvenanceAuthorizable(event.getComponentId()); + eventAuthorizable.authorize(authorizer, RequestAction.READ, user); + } + + private Filter<ProvenanceEventRecord> createFilter(final Query query, final NiFiUser user) { return new Filter<ProvenanceEventRecord>() { @Override public boolean select(final ProvenanceEventRecord event) { + if (!isAuthorized(event, user)) { + return false; + } + if (query.getStartDate() != null && query.getStartDate().getTime() > event.getEventTime()) { return false; } @@ -348,36 +411,51 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } @Override - public QuerySubmission submitQuery(final Query query) { + public QuerySubmission submitQuery(final Query query, final NiFiUser user) { if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { throw new IllegalArgumentException("Query End Time cannot be before Query Start Time"); } if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) { - final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1); - queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query), query.getMaxResults(), result)); + final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1, user.getIdentity()); + queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query, user), query.getMaxResults(), result)); querySubmissionMap.put(query.getIdentifier(), result); return result; } - final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1); + final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1, user.getIdentity()); querySubmissionMap.put(query.getIdentifier(), result); - queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query), query.getMaxResults(), result)); + queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query, user), query.getMaxResults(), result)); return result; } @Override - public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) { - return querySubmissionMap.get(queryIdentifier); + public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) { + final QuerySubmission submission = querySubmissionMap.get(queryIdentifier); + final String userId = submission.getSubmitterIdentity(); + + if (user == null && userId == null) { + return submission; + } + + if (user == null) { + throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because no user id was provided"); + } + + if (userId == null || userId.equals(user.getIdentity())) { + return submission; + } + + throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request"); } - public Lineage computeLineage(final String flowFileUUID) throws IOException { - return computeLineage(Collections.<String>singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null); + public Lineage computeLineage(final String flowFileUUID, final NiFiUser user) throws IOException { + return computeLineage(Collections.<String> singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null); } - private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) throws IOException { - final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId); + private Lineage computeLineage(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) throws IOException { + final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, user, computationType, eventId); final StandardLineageResult result = submission.getResult(); while (!result.isFinished()) { try { @@ -394,13 +472,28 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } @Override - public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid) { - return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null); + public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) { + return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null); } @Override - public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier) { - return lineageSubmissionMap.get(lineageIdentifier); + public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, final NiFiUser user) { + final ComputeLineageSubmission submission = lineageSubmissionMap.get(lineageIdentifier); + final String userId = submission.getSubmitterIdentity(); + + if (user == null && userId == null) { + return submission; + } + + if (user == null) { + throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because no user id was provided"); + } + + if (userId == null || userId.equals(user.getIdentity())) { + return submission; + } + + throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request"); } public Lineage expandSpawnEventParents(String identifier) throws IOException { @@ -408,10 +501,12 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } @Override - public ComputeLineageSubmission submitExpandParents(final long eventId) { - final ProvenanceEventRecord event = getEvent(eventId); + public ComputeLineageSubmission submitExpandParents(final long eventId, final NiFiUser user) { + final String userId = user.getIdentity(); + + final ProvenanceEventRecord event = getEvent(eventId, user); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); return submission; @@ -422,9 +517,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { case FORK: case REPLAY: case CLONE: - return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId); + return submitLineageComputation(event.getParentUuids(), user, LineageComputationType.EXPAND_PARENTS, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); return submission; @@ -437,10 +532,12 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } @Override - public ComputeLineageSubmission submitExpandChildren(final long eventId) { - final ProvenanceEventRecord event = getEvent(eventId); + public ComputeLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) { + final String userId = user.getIdentity(); + + final ProvenanceEventRecord event = getEvent(eventId, user); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); return submission; @@ -451,9 +548,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { case FORK: case REPLAY: case CLONE: - return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId); + return submitLineageComputation(event.getChildUuids(), user, LineageComputationType.EXPAND_CHILDREN, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); return submission; @@ -461,13 +558,18 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } } - private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) { - final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1); + private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) { + final String userId = user.getIdentity(); + final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1, userId); lineageSubmissionMap.put(result.getLineageIdentifier(), result); final Filter<ProvenanceEventRecord> filter = new Filter<ProvenanceEventRecord>() { @Override public boolean select(final ProvenanceEventRecord event) { + if (user != null && !isAuthorized(event, user)) { + return false; + } + if (flowFileUuids.contains(event.getFlowFileUuid())) { return true; } http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9e2fdf/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java index 3c3e401..d35ceac 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QuerySubmission; @@ -70,7 +71,7 @@ public class TestVolatileProvenanceRepository { assertEquals(10, retrieved.size()); for (int i = 0; i < 10; i++) { final ProvenanceEventRecord recovered = retrieved.get(i); - assertEquals((long) i, recovered.getEventId()); + assertEquals(i, recovered.getEventId()); assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType()); assertEquals(attributes, recovered.getAttributes()); @@ -108,7 +109,7 @@ public class TestVolatileProvenanceRepository { query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); query.setMaxResults(100); - final QuerySubmission submission = repo.submitQuery(query); + final QuerySubmission submission = repo.submitQuery(query, createUser()); while (!submission.getResult().isFinished()) { Thread.sleep(100L); } @@ -175,4 +176,27 @@ public class TestVolatileProvenanceRepository { }; } + private NiFiUser createUser() { + return new NiFiUser() { + @Override + public String getIdentity() { + return "unit-test"; + } + + @Override + public String getUserName() { + return "Unit Test"; + } + + @Override + public NiFiUser getChain() { + return null; + } + + @Override + public boolean isAnonymous() { + return false; + } + }; + } }
