This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c19db9d  NIFI-7375: This closes #4218. Fixed a bug that caused 
Provenance Events not to show up in specific situations when clicking View 
Provenance for a Processor. - Added System-level tests for Provenance 
repository to reproduce behavior. - Added a Provenance Client to the CLI, which 
is necessary for System-level tests. - Added small additional configuration for 
Provenance repository to simplify development of system tests - Minor 
improvements to system tests (such as ability  [...]
c19db9d is described below

commit c19db9d623455b157d59c75c49f4ba0726693b43
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Apr 15 12:24:46 2020 -0400

    NIFI-7375: This closes #4218. Fixed a bug that caused Provenance Events not 
to show up in specific situations when clicking View Provenance for a Processor.
    - Added System-level tests for Provenance repository to reproduce behavior.
    - Added a Provenance Client to the CLI, which is necessary for System-level 
tests.
    - Added small additional configuration for Provenance repository to 
simplify development of system tests
    - Minor improvements to system tests (such as ability to destroy 
environment between tests) needed for Provenance repository based system tests
    
    Signed-off-by: Joe Witt <joew...@apache.org>
---
 .../nifi/provenance/StandardQueryResult.java       |  10 +-
 .../java/org/apache/nifi/util/NiFiProperties.java  |   1 +
 .../nifi/provenance/RepositoryConfiguration.java   |  18 +-
 .../lucene/LatestEventsPerProcessorQuery.java      |   5 +
 .../provenance/index/lucene/LatestEventsQuery.java |   4 +
 .../provenance/index/lucene/LuceneEventIndex.java  |   5 +-
 .../nifi/provenance/index/lucene/QueryTask.java    |  12 +-
 .../provenance/store/PartitionedEventStore.java    |   3 +-
 .../provenance/store/WriteAheadStorePartition.java |  27 ++-
 .../processors/tests/system/TerminateFlowFile.java |  35 ++++
 .../services/org.apache.nifi.processor.Processor   |   5 +-
 .../nifi/tests/system/InstanceConfiguration.java   |  18 ++
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  46 +++++
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |  21 ++-
 .../SpawnedStandaloneNiFiInstanceFactory.java      |  18 ++
 .../system/provenance/ProvenanceRepositoryIT.java  | 205 +++++++++++++++++++++
 .../test/resources/conf/default/nifi.properties    |   1 +
 .../toolkit/cli/impl/client/NiFiClientFactory.java |  16 ++
 .../toolkit/cli/impl/client/nifi/NiFiClient.java   |   8 +
 .../cli/impl/client/nifi/ProvenanceClient.java     |  36 ++++
 .../impl/client/nifi/impl/JerseyNiFiClient.java    |  18 ++
 .../client/nifi/impl/JerseyProvenanceClient.java   | 120 ++++++++++++
 22 files changed, 613 insertions(+), 19 deletions(-)

diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
index 2777339..eab3a46 100644
--- 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
@@ -48,6 +48,7 @@ public class StandardQueryResult implements QueryResult, 
ProgressiveResult {
     private final Lock writeLock = rwLock.writeLock();
     // guarded by writeLock
     private final SortedSet<ProvenanceEventRecord> matchingRecords = new 
TreeSet<>(new EventIdComparator());
+    private long hitCount = 0L;
     private int numCompletedSteps = 0;
     private Date expirationDate;
     private String error;
@@ -163,6 +164,7 @@ public class StandardQueryResult implements QueryResult, 
ProgressiveResult {
             }
 
             this.matchingRecords.addAll(newEvents);
+            hitCount += totalHits;
 
             // If we've added more records than the query's max, then remove 
the trailing elements.
             // We do this, rather than avoiding the addition of the elements 
because we want to choose
@@ -188,10 +190,12 @@ public class StandardQueryResult implements QueryResult, 
ProgressiveResult {
                 queryComplete = true;
 
                 if (numCompletedSteps >= numSteps) {
-                    logger.info("Completed {} comprised of {} steps in {} 
millis", query, numSteps, queryTime);
+                    logger.info("Completed {} comprised of {} steps in {} 
millis. Index found {} hits. Read {} events from Event Files.",
+                        query, numSteps, queryTime, hitCount, 
matchingRecords.size());
                 } else {
-                    logger.info("Completed {} comprised of {} steps in {} 
millis (only completed {} steps because the maximum number of results was 
reached)",
-                        query, numSteps, queryTime, numCompletedSteps);
+                    logger.info("Completed {} comprised of {} steps in {} 
millis. Index found {} hits. Read {} events from Event Files. "
+                            + "Only completed {} steps because the maximum 
number of results was reached.",
+                        query, numSteps, queryTime, hitCount, 
matchingRecords.size(), numCompletedSteps);
                 }
             }
         } finally {
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 3fea5df..6bbc921 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -126,6 +126,7 @@ public abstract class NiFiProperties {
     public static final String PROVENANCE_MAX_STORAGE_SIZE = 
"nifi.provenance.repository.max.storage.size";
     public static final String PROVENANCE_ROLLOVER_TIME = 
"nifi.provenance.repository.rollover.time";
     public static final String PROVENANCE_ROLLOVER_SIZE = 
"nifi.provenance.repository.rollover.size";
+    public static final String PROVENANCE_ROLLOVER_EVENT_COUNT = 
"nifi.provenance.repository.rollover.events";
     public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = 
"nifi.provenance.repository.query.threads";
     public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = 
"nifi.provenance.repository.index.threads";
     public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = 
"nifi.provenance.repository.compress.on.rollover";
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index b04bca7..53549aa 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -39,6 +39,7 @@ public class RepositoryConfiguration {
 
     public static final String CONCURRENT_MERGE_THREADS = 
"nifi.provenance.repository.concurrent.merge.threads";
     public static final String WARM_CACHE_FREQUENCY = 
"nifi.provenance.repository.warm.cache.frequency";
+    public static final String MAINTENACE_FREQUENCY = 
"nifi.provenance.repository.maintenance.frequency";
 
     private final Map<String, File> storageDirectories = new LinkedHashMap<>();
     private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, 
TimeUnit.HOURS);
@@ -51,6 +52,7 @@ public class RepositoryConfiguration {
     private int compressionBlockBytes = 1024 * 1024;
     private int maxAttributeChars = 65536;
     private int debugFrequency = 1_000_000;
+    private long maintenanceFrequencyMillis = TimeUnit.MINUTES.toMillis(1L);
 
     // TODO: Delegaate to RepositoryEncryptionConfiguration in NIFI-6617
     private Map<String, String> encryptionKeys;
@@ -416,6 +418,14 @@ public class RepositoryConfiguration {
         this.debugFrequency = debugFrequency;
     }
 
+    public long getMaintenanceFrequency(final TimeUnit timeUnit) {
+        return timeUnit.convert(maintenanceFrequencyMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    public void setMaintenanceFrequency(final long period, final TimeUnit 
timeUnit) {
+        this.maintenanceFrequencyMillis = timeUnit.toMillis(period);
+    }
+
 
     public static RepositoryConfiguration create(final NiFiProperties 
nifiProperties) {
         final Map<String, Path> storageDirectories = 
nifiProperties.getProvenanceRepositoryPaths();
@@ -426,13 +436,14 @@ public class RepositoryConfiguration {
         final String storageSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
         final String rolloverTime = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
         final String rolloverSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
+        final int rolloverEventCount = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_ROLLOVER_EVENT_COUNT,
 Integer.MAX_VALUE);
         final String shardSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 
MB");
         final int queryThreads = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE,
 2);
         final int indexThreads = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE,
 2);
         final int journalCount = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
         final int concurrentMergeThreads = 
nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2);
         final String warmCacheFrequency = 
nifiProperties.getProperty(WARM_CACHE_FREQUENCY);
-
+        final String maintenanceFrequency = 
nifiProperties.getProperty(MAINTENACE_FREQUENCY);
         final long storageMillis = FormatUtils.getTimeDuration(storageTime, 
TimeUnit.MILLISECONDS);
         final long maxStorageBytes = DataUnit.parseDataSize(storageSize, 
DataUnit.B).longValue();
         final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, 
TimeUnit.MILLISECONDS);
@@ -475,6 +486,7 @@ public class RepositoryConfiguration {
         config.setSearchableFields(searchableFields);
         config.setSearchableAttributes(searchableAttributes);
         config.setMaxEventFileCapacity(rolloverBytes);
+        config.setMaxEventFileCount(rolloverEventCount);
         config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
         config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
         config.setMaxStorageCapacity(maxStorageBytes);
@@ -490,6 +502,10 @@ public class RepositoryConfiguration {
         if (shardSize != null) {
             config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, 
DataUnit.B).longValue());
         }
+        if (maintenanceFrequency != null && 
!maintenanceFrequency.trim().equals("")) {
+            final long millis = 
FormatUtils.getTimeDuration(maintenanceFrequency.trim(), TimeUnit.MILLISECONDS);
+            config.setMaintenanceFrequency(millis, TimeUnit.MILLISECONDS);
+        }
 
         config.setAlwaysSync(alwaysSync);
 
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
index 73b0a14..1c09d57 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
@@ -74,4 +74,9 @@ public class LatestEventsPerProcessorQuery implements 
CachedQuery {
         return Optional.of(eventIds);
     }
 
+    @Override
+    public String toString() {
+        return "Latest Events Per Processor";
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
index 94cd013..6002521 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
@@ -52,4 +52,8 @@ public class LatestEventsQuery implements CachedQuery {
         }
     }
 
+    @Override
+    public String toString() {
+        return "Most Recent Events Query";
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
index f4b96c6..f8cf709 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
@@ -606,11 +606,14 @@ public class LuceneEventIndex implements EventIndex {
                 querySubmissionMap.put(query.getIdentifier(), submission);
 
                 final List<Long> eventIds = eventIdListOption.get();
+                logger.debug("Cached Query {} produced {} Event IDs for {}: 
{}", cachedQuery, eventIds.size(), query, eventIds);
 
                 queryExecutor.submit(() -> {
                     List<ProvenanceEventRecord> events;
                     try {
                         events = eventStore.getEvents(eventIds, authorizer, 
EventTransformer.EMPTY_TRANSFORMER);
+                        logger.debug("Retrieved {} of {} Events from Event 
Store", events.size(), eventIds.size());
+
                         submission.getResult().update(events, eventIds.size());
                     } catch (final Exception e) {
                         submission.getResult().setError("Failed to retrieve 
Provenance Events from store; see logs for more details");
@@ -639,7 +642,7 @@ public class LuceneEventIndex implements EventIndex {
         querySubmissionMap.put(query.getIdentifier(), submission);
 
         final org.apache.lucene.search.Query luceneQuery = 
LuceneUtil.convertQuery(query);
-        logger.debug("Submitting query {} with identifier {} against index 
directories {}", luceneQuery, query.getIdentifier(), indexDirectories);
+        logger.debug("Submitting query {} with identifier {} against {} index 
directories: {}", luceneQuery, query.getIdentifier(), indexDirectories.size(), 
indexDirectories);
 
         if (indexDirectories.isEmpty()) {
             submission.getResult().update(Collections.emptyList(), 0L);
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
index 13c0367..368a19d 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
@@ -19,6 +19,8 @@ package org.apache.nifi.provenance.index.lucene;
 
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.TopDocs;
 import org.apache.nifi.provenance.ProgressiveResult;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -105,7 +107,7 @@ public class QueryTask implements Runnable {
 
         try {
             final long borrowMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - borrowStart);
-            logger.debug("Borrowing index searcher for {} took {} ms", 
indexDir, borrowMillis);
+            logger.trace("Borrowing index searcher for {} took {} ms", 
indexDir, borrowMillis);
             final long startNanos = System.nanoTime();
 
             // If max number of results are retrieved, do not bother querying 
lucene
@@ -124,7 +126,11 @@ public class QueryTask implements Runnable {
             final IndexReader indexReader = 
searcher.getIndexSearcher().getIndexReader();
             final TopDocs topDocs;
             try {
-                topDocs = searcher.getIndexSearcher().search(query, 
maxResults);
+
+                // Sort based on document id, descending. This gives us most 
recent events first.
+                final Sort sort = new Sort(new SortField(null, 
SortField.Type.DOC, true));
+
+                topDocs = searcher.getIndexSearcher().search(query, 
maxResults, sort);
             } catch (final Exception e) {
                 logger.error("Failed to query Lucene for index " + indexDir, 
e);
                 queryResult.setError("Failed to query Lucene for index " + 
indexDir + " due to " + e);
@@ -189,7 +195,7 @@ public class QueryTask implements Runnable {
 
         final long endConvert = System.nanoTime();
         final long ms = TimeUnit.NANOSECONDS.toMillis(endConvert - start);
-        logger.debug("Converting documents took {} ms", ms);
+        logger.trace("Converting documents took {} ms", ms);
 
         List<ProvenanceEventRecord> events;
         try {
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
index 782e7ab..a320f5a 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
@@ -62,7 +62,8 @@ public abstract class PartitionedEventStore implements 
EventStore {
     @Override
     public void initialize() throws IOException {
         maintenanceExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Provenance Repository Maintenance"));
-        maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 
1, 1, TimeUnit.MINUTES);
+        final long maintenanceMillis = 
repoConfig.getMaintenanceFrequency(TimeUnit.MILLISECONDS);
+        maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, 
maintenanceMillis, maintenanceMillis, TimeUnit.MILLISECONDS);
 
         for (final EventStorePartition partition : getPartitions()) {
             partition.initialize();
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
index 68d2b3d..9d805f9 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -175,7 +175,7 @@ public class WriteAheadStorePartition implements 
EventStorePartition {
         }
 
         // Claim a Record Writer Lease so that we have a writer to persist the 
events to
-        RecordWriterLease lease = null;
+        RecordWriterLease lease;
         while (true) {
             lease = getLease();
             if (lease.tryClaim()) {
@@ -185,7 +185,6 @@ public class WriteAheadStorePartition implements 
EventStorePartition {
             final RolloverState rolloverState = lease.getRolloverState();
             if (rolloverState.isRollover()) {
                 final boolean success = tryRollover(lease);
-
                 if (success) {
                     logger.info("Successfully rolled over Event Writer for {} 
due to {}", this, rolloverState);
                 }
@@ -476,9 +475,16 @@ public class WriteAheadStorePartition implements 
EventStorePartition {
     public void purgeOldEvents(final long olderThan, final TimeUnit unit) {
         final long timeCutoff = System.currentTimeMillis() - 
unit.toMillis(olderThan);
 
-        getEventFilesFromDisk().filter(file -> file.lastModified() < 
timeCutoff)
+        final List<File> removed = getEventFilesFromDisk().filter(file -> 
file.lastModified() < timeCutoff)
             .sorted(DirectoryUtils.SMALLEST_ID_FIRST)
-            .forEach(this::delete);
+            .filter(this::delete)
+            .collect(Collectors.toList());
+
+        if (removed.isEmpty()) {
+            logger.debug("No Provenance Event files that exceed time-based 
threshold of {} {}", olderThan, unit);
+        } else {
+            logger.info("Purged {} Provenance Event files from Provenance 
Repository because the events were older than {} {}: {}", removed.size(), 
olderThan, unit, removed);
+        }
     }
 
     private File getActiveEventFile() {
@@ -489,20 +495,27 @@ public class WriteAheadStorePartition implements 
EventStorePartition {
     @Override
     public long purgeOldestEvents() {
         final List<File> eventFiles = 
getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
-        if (eventFiles.isEmpty()) {
+        if (eventFiles.size() < 2) {
+            // If there are no Event Files, there's nothing to do. If there is 
exactly 1 Event File, it means that the only Event File
+            // that exists is the Active Event File, which we are writing to, 
so we don't want to remove it either.
             return 0L;
         }
 
         final File currentFile = getActiveEventFile();
+        if (currentFile == null) {
+            logger.debug("There is currently no Active Event File for {}. Will 
not purge oldest events until the Active Event File has been established.", 
this);
+            return 0L;
+        }
+
         for (final File eventFile : eventFiles) {
             if (eventFile.equals(currentFile)) {
-                continue;
+                break;
             }
 
             final long fileSize = eventFile.length();
 
             if (delete(eventFile)) {
-                logger.debug("{} Deleted {} event file ({}) due to storage 
limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
+                logger.info("{} Deleted {} event file ({}) due to storage 
limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
                 return fileSize;
             } else {
                 logger.warn("{} Failed to delete oldest event file {}. This 
file should be cleaned up manually.", this, eventFile);
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
new file mode 100644
index 0000000..734766f
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class TerminateFlowFile extends AbstractProcessor {
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        session.remove(flowFile);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 003c917..6d30e3c 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,8 +14,9 @@
 # limitations under the License.
 
 org.apache.nifi.processors.tests.system.CountEvents
+org.apache.nifi.processors.tests.system.FakeProcessor
+org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
 org.apache.nifi.processors.tests.system.GenerateFlowFile
 org.apache.nifi.processors.tests.system.Sleep
+org.apache.nifi.processors.tests.system.TerminateFlowFile
 org.apache.nifi.processors.tests.system.ValidateFileExists
-org.apache.nifi.processors.tests.system.FakeProcessor
-org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
\ No newline at end of file
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java
index 85a839c..28dc2ec 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java
@@ -18,6 +18,8 @@ package org.apache.nifi.tests.system;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class InstanceConfiguration {
     private final File bootstrapConfigFile;
@@ -25,6 +27,7 @@ public class InstanceConfiguration {
     private final File flowXmlGz;
     private final File stateDirectory;
     private final boolean autoStart;
+    private final Map<String, String> nifiPropertiesOverrides;
 
     private InstanceConfiguration(Builder builder) {
         this.bootstrapConfigFile = builder.bootstrapConfigFile;
@@ -32,6 +35,7 @@ public class InstanceConfiguration {
         this.flowXmlGz = builder.flowXmlGz;
         this.stateDirectory = builder.stateDirectory;
         this.autoStart = builder.autoStart;
+        this.nifiPropertiesOverrides = builder.nifiPropertiesOverrides;
     }
 
     public File getBootstrapConfigFile() {
@@ -54,12 +58,26 @@ public class InstanceConfiguration {
         return autoStart;
     }
 
+    public Map<String, String> getNifiPropertiesOverrides() {
+        return nifiPropertiesOverrides;
+    }
+
     public static class Builder {
         private File bootstrapConfigFile;
         private File instanceDirectory;
         private File flowXmlGz;
         private File stateDirectory;
         private boolean autoStart = true;
+        private final Map<String, String> nifiPropertiesOverrides = new 
HashMap<>();
+
+        public Builder overrideNifiProperties(final Map<String, String> 
overrides) {
+            nifiPropertiesOverrides.clear();
+            if (overrides != null) {
+                nifiPropertiesOverrides.putAll(overrides);
+            }
+
+            return this;
+        }
 
         public Builder bootstrapConfig(final File configFile) {
             if (!configFile.exists()) {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index bb7b54b..a9806a0 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -20,6 +20,7 @@ import 
org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
@@ -47,6 +48,8 @@ import org.apache.nifi.web.api.dto.VariableDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
 import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
 import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
@@ -67,6 +70,7 @@ import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProvenanceEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
 import org.apache.nifi.web.api.entity.VariableEntity;
@@ -80,6 +84,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -772,4 +777,45 @@ public class NiFiClientUtil {
         final ProcessGroupEntity childGroup = 
nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, 
childGroupEntity);
         return childGroup;
     }
+
+    public ProvenanceEntity queryProvenance(final Map<SearchableField, String> 
searchTerms, final Long startTime, final Long endTime) throws 
NiFiClientException, IOException {
+        final Map<String, String> searchTermsAsStrings = 
searchTerms.entrySet().stream()
+            .collect(Collectors.toMap(entry -> 
entry.getKey().getSearchableFieldName(), Map.Entry::getValue));
+
+        final ProvenanceRequestDTO requestDto = new ProvenanceRequestDTO();
+        requestDto.setSearchTerms(searchTermsAsStrings);
+        requestDto.setSummarize(false);
+        requestDto.setStartDate(startTime == null ? null : new 
Date(startTime));
+        requestDto.setEndDate(endTime == null ? null : new Date(endTime));
+        requestDto.setMaxResults(1000);
+
+        final ProvenanceDTO dto = new ProvenanceDTO();
+        dto.setRequest(requestDto);
+        dto.setSubmissionTime(new Date());
+
+        final ProvenanceEntity entity = new ProvenanceEntity();
+        entity.setProvenance(dto);
+
+        ProvenanceEntity responseEntity = 
nifiClient.getProvenanceClient().submitProvenanceQuery(entity);
+
+        try {
+            responseEntity = waitForComplete(responseEntity);
+        } catch (final InterruptedException ie) {
+            Assert.fail("Interrupted while waiting for Provenance Query to 
complete");
+        }
+
+        
nifiClient.getProvenanceClient().deleteProvenanceQuery(responseEntity.getProvenance().getId());
+        return responseEntity;
+    }
+
+    public ProvenanceEntity waitForComplete(final ProvenanceEntity entity) 
throws InterruptedException, NiFiClientException, IOException {
+        ProvenanceEntity current = entity;
+        while (current.getProvenance().isFinished() != Boolean.TRUE) {
+            Thread.sleep(100L);
+            current = 
nifiClient.getProvenanceClient().getProvenanceQuery(entity.getProvenance().getId());
+        }
+
+        return current;
+    }
+
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 04a376b..1e37b01 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -33,6 +33,8 @@ import org.junit.rules.Timeout;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BooleanSupplier;
@@ -98,6 +100,10 @@ public abstract class NiFiSystemIT {
             if (isDestroyFlowAfterEachTest()) {
                 destroyFlow();
             }
+
+            if (isDestroyEnvironmentAfterEachTest()) {
+                cleanup();
+            }
         } finally {
             if (nifiClient != null) {
                 nifiClient.close();
@@ -105,6 +111,10 @@ public abstract class NiFiSystemIT {
         }
     }
 
+    protected boolean isDestroyEnvironmentAfterEachTest() {
+        return false;
+    }
+
     protected void destroyFlow() throws NiFiClientException, IOException {
         getClientUtil().stopProcessGroupComponents("root");
         getClientUtil().disableControllerServices("root");
@@ -229,16 +239,25 @@ public abstract class NiFiSystemIT {
             new InstanceConfiguration.Builder()
                 
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
                 .instanceDirectory("target/standalone-instance")
+                .overrideNifiProperties(getNifiPropertiesOverrides())
                 .build());
     }
 
+    protected Map<String, String> getNifiPropertiesOverrides() {
+        return Collections.emptyMap();
+    }
+
     protected boolean isDestroyFlowAfterEachTest() {
         return true;
     }
 
     protected void waitFor(final BooleanSupplier condition) throws 
InterruptedException {
+        waitFor(condition, 10L);
+    }
+
+    protected void waitFor(final BooleanSupplier condition, final long 
delayMillis) throws InterruptedException {
         while (!condition.getAsBoolean()) {
-            Thread.sleep(10L);
+            Thread.sleep(delayMillis);
         }
     }
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
index 081eb1e..c19a755 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
@@ -133,6 +133,24 @@ public class SpawnedStandaloneNiFiInstanceFactory 
implements NiFiInstanceFactory
                 final File destinationFlowXmlGz = new File(destinationConf, 
"flow.xml.gz");
                 Files.copy(flowXmlGz.toPath(), destinationFlowXmlGz.toPath());
             }
+
+            // Write out any Property overrides
+            final Map<String, String> nifiPropertiesOverrides = 
instanceConfiguration.getNifiPropertiesOverrides();
+            if (nifiPropertiesOverrides != null && 
!nifiPropertiesOverrides.isEmpty()) {
+                final File destinationNifiProperties = new 
File(destinationConf, "nifi.properties");
+                final File sourceNifiProperties = new 
File(bootstrapConfigFile.getParentFile(), "nifi.properties");
+
+                final Properties nifiProperties = new Properties();
+                try (final InputStream fis = new 
FileInputStream(sourceNifiProperties)) {
+                    nifiProperties.load(fis);
+                }
+
+                nifiPropertiesOverrides.forEach(nifiProperties::setProperty);
+
+                try (final OutputStream fos = new 
FileOutputStream(destinationNifiProperties)) {
+                    nifiProperties.store(fos, null);
+                }
+            }
         }
 
         private void copyContents(final File dir, final File destinationDir) 
throws IOException {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java
new file mode 100644
index 0000000..2a5481d
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProvenanceEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProvenanceRepositoryIT extends NiFiSystemIT {
+
+    @Override
+    protected Map<String, String> getNifiPropertiesOverrides() {
+        final Map<String, String> properties = new HashMap<>();
+
+        // Force only a single Provenance Event File to exist
+        properties.put("nifi.provenance.repository.max.storage.size", "1 KB");
+
+        // Perform maintenance every 2 seconds to ensure that we don't have to 
wait a long time for old event files to roll off.
+        properties.put("nifi.provenance.repository.maintenance.frequency", "2 
secs");
+
+        return properties;
+    }
+
+    @Override
+    protected boolean isDestroyEnvironmentAfterEachTest() {
+        // We need to destroy entire environment after each test to ensure 
that the repositories are destroyed.
+        // This is important because we are expecting exact numbers of events 
in the repo.
+        return true;
+    }
+
+    @Test
+    public void testSimpleQueryByComponentID() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity generateFlowFile = 
getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity count = 
getClientUtil().createProcessor("CountEvents");
+        getClientUtil().setAutoTerminatedRelationships(count, "success");
+        getClientUtil().createConnection(generateFlowFile, count, "success");
+
+        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getNifiClient().getProcessorClient().startProcessor(count);
+
+        final Map<SearchableField, String> searchTerms = 
Collections.singletonMap(SearchableFields.ComponentID, 
generateFlowFile.getId());
+        ProvenanceEntity provenanceEntity = 
getClientUtil().queryProvenance(searchTerms, null, null);
+        assertEquals(0, 
provenanceEntity.getProvenance().getResults().getProvenanceEvents().size());
+
+        // Wait for there to be at least 1 event.
+        waitForEventCountAtLeast(searchTerms, 1);
+
+        provenanceEntity = getClientUtil().queryProvenance(searchTerms, null, 
null);
+
+        final List<ProvenanceEventDTO> events = 
provenanceEntity.getProvenance().getResults().getProvenanceEvents();
+        assertEquals(1, events.size());
+
+        final ProvenanceEventDTO firstEvent = events.get(0);
+        assertEquals(ProvenanceEventType.CREATE.name(), 
firstEvent.getEventType());
+    }
+
+
+
+    // If we add some events for Component ABC and then they age off, we 
should be able to query and get back 0 results.
+    // If we then add some more events for Component ABC and query, we should 
see those new events. Even if we have aged off
+    // 1000+ events (1000 = max results of the provenance query). This should 
be true whether NiFi is restarted in between or not.
+    // To ensure this, we have two tests that are very similar but one 
restarts NiFi in between and one does not.
+    // This test does not restart NiFi.
+    @Test
+    public void testAgeOffEventsThenAddSomeThenQuery() throws 
NiFiClientException, IOException, InterruptedException {
+        ProcessorEntity generateFlowFile = 
getClientUtil().createProcessor("GenerateFlowFile");
+        generateFlowFile = 
getClientUtil().updateProcessorProperties(generateFlowFile, 
Collections.singletonMap("Batch Size", "800"));
+
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+        getClientUtil().setAutoTerminatedRelationships(terminate, "success");
+        getClientUtil().createConnection(generateFlowFile, terminate, 
"success");
+
+        generateFlowFile = 
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+
+        final Map<SearchableField, String> generateSearchTerms = 
Collections.singletonMap(SearchableFields.ComponentID, 
generateFlowFile.getId());
+
+        // Wait for there to be at least 1000 events for Generate processor 
and then stop the processor
+        waitForEventCountAtLeast(generateSearchTerms, 800);
+        getNifiClient().getProcessorClient().stopProcessor(generateFlowFile);
+
+        // Start Terminate proc & wait for at least 600 events to be 
registered. We do this because each Event File can hold up to 1,000 Events.
+        // The GenerateFlowFile would have 800. The first 200 events from 
Terminate will be in the first Event File, causing that one to
+        // roll over and subsequently be aged off. The second Event File will 
hold the other 600. So we may have 600 or 800 events,
+        // depending on when the query is executed.
+        getNifiClient().getProcessorClient().startProcessor(terminate);
+        final Map<SearchableField, String> terminateSearchTerms = 
Collections.singletonMap(SearchableFields.ComponentID, terminate.getId());
+        waitForEventCountAtLeast(terminateSearchTerms, 600);
+
+        waitForEventCountExactly(generateSearchTerms, 0);
+
+        // Emit 25 more events
+        getClientUtil().updateProcessorProperties(generateFlowFile, 
Collections.singletonMap("Batch Size", "25"));
+        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+
+        // Wait for those 25 events to be emitted
+        waitForEventCountAtLeast(generateSearchTerms, 25);
+    }
+
+
+    // If we add some events for Component ABC and then they age off, we 
should be able to query and get back 0 results.
+    // If we then add some more events for Component ABC and query, we should 
see those new events. Even if we have aged off
+    // 1000+ events (1000 = max results of the provenance query). This should 
be true whether NiFi is restarted in between or not.
+    // To ensure this, we have two tests that are very similar but one 
restarts NiFi in between and one does not.
+    // This test does restart NiFi.
+    @Test
+    public void testAgeOffEventsThenRestartAddSomeThenQuery() throws 
NiFiClientException, IOException, InterruptedException {
+        ProcessorEntity generateFlowFile = 
getClientUtil().createProcessor("GenerateFlowFile");
+        generateFlowFile = 
getClientUtil().updateProcessorProperties(generateFlowFile, 
Collections.singletonMap("Batch Size", "800"));
+
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+        getClientUtil().setAutoTerminatedRelationships(terminate, "success");
+        getClientUtil().createConnection(generateFlowFile, terminate, 
"success");
+
+        generateFlowFile = 
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+
+        final Map<SearchableField, String> generateSearchTerms = 
Collections.singletonMap(SearchableFields.ComponentID, 
generateFlowFile.getId());
+
+        // Wait for there to be at least 800 events for Generate processor and 
then stop it
+        waitForEventCountAtLeast(generateSearchTerms, 800);
+        getNifiClient().getProcessorClient().stopProcessor(generateFlowFile);
+
+        // Start Terminate proc & wait for at least 600 events to be 
registered. We do this because each Event File can hold up to 1,000 Events.
+        // The GenerateFlowFile would have 800. The first 200 events from 
Terminate will be in the first Event File, causing that one to
+        // roll over and subsequently be aged off. The second Event File will 
hold the other 600. So we may have 600 or 800 events,
+        // depending on when the query is executed.
+        getNifiClient().getProcessorClient().startProcessor(terminate);
+        final Map<SearchableField, String> terminateSearchTerms = 
Collections.singletonMap(SearchableFields.ComponentID, terminate.getId());
+        waitForEventCountAtLeast(terminateSearchTerms, 600);
+        getNifiClient().getProcessorClient().stopProcessor(terminate);
+
+        waitForEventCountExactly(generateSearchTerms, 0);
+
+        // Restart NiFi. We do this so that when we query provenance for the 
Processor we won't be able to use the "Cached" events
+        // and will instead have to query Lucene
+        getNiFiInstance().stop();
+        getNiFiInstance().start();
+
+        // Ensure that Terminate processor is stopped, since nifi could have 
shutdown before persisting flow.xml.gz
+        terminate.getRevision().setVersion(0L); // Reset the revision
+        getNifiClient().getProcessorClient().stopProcessor(terminate);
+        getClientUtil().waitForStoppedProcessor(terminate.getId());
+
+        // Emit 400 more events
+        generateFlowFile.getRevision().setVersion(0L); // Reset the revision
+        getClientUtil().updateProcessorProperties(generateFlowFile, 
Collections.singletonMap("Batch Size", "400"));
+        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+
+        // Since we restarted, the previous Event File will be rolled over. 
And since it will be > 1 KB in size, it will age off almost immediately.
+        // This will leave us with only the 400 newly created events.
+        waitForEventCountExactly(generateSearchTerms, 400);
+    }
+
+    private void waitForEventCountExactly(final Map<SearchableField, String> 
searchTerms, final int expectedCount) throws InterruptedException {
+        waitForEventCount(searchTerms, count -> count == expectedCount);
+    }
+
+    private void waitForEventCountAtLeast(final Map<SearchableField, String> 
searchTerms, final int expectedCount) throws InterruptedException {
+        waitForEventCount(searchTerms, count -> count >= expectedCount);
+    }
+
+    private void waitForEventCount(final Map<SearchableField, String> 
searchTerms, final Predicate<Integer> predicate) throws InterruptedException {
+        // Wait for there to be at least 1000 events for Generate processor
+        waitFor(() -> {
+            try {
+                return predicate.test(getEventCount(searchTerms));
+            } catch (final Exception e) {
+                return false;
+            }
+        }, 500L);
+    }
+
+    private int getEventCount(final Map<SearchableField, String> searchTerms) 
throws NiFiClientException, IOException {
+        ProvenanceEntity provEntity = 
getClientUtil().queryProvenance(searchTerms, null, null);
+        return 
provEntity.getProvenance().getResults().getProvenanceEvents().size();
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
index 0b4aa9d..8a37b68 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
@@ -96,6 +96,7 @@ 
nifi.provenance.repository.directory.default=./provenance_repository
 nifi.provenance.repository.max.storage.time=24 hours
 nifi.provenance.repository.max.storage.size=1 GB
 nifi.provenance.repository.rollover.time=30 secs
+nifi.provenance.repository.rollover.events=1000
 nifi.provenance.repository.rollover.size=100 MB
 nifi.provenance.repository.query.threads=2
 nifi.provenance.repository.index.threads=2
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java
index 499157c..804a60d 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java
@@ -33,6 +33,7 @@ import 
org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.PoliciesClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient;
@@ -373,6 +374,21 @@ public class NiFiClientFactory implements 
ClientFactory<NiFiClient> {
         }
 
         @Override
+        public ProvenanceClient getProvenanceClient() {
+            return wrappedClient.getProvenanceClient();
+        }
+
+        @Override
+        public ProvenanceClient getProvenanceClientForProxiedEntities(final 
String... proxiedEntity) {
+            return 
wrappedClient.getProvenanceClientForProxiedEntities(proxiedEntity);
+        }
+
+        @Override
+        public ProvenanceClient getProvenanceClientForToken(final String 
token) {
+            return wrappedClient.getProvenanceClientForToken(token);
+        }
+
+        @Override
         public void close() throws IOException {
             wrappedClient.close();
         }
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java
index 3c66fe3..1a7e7c2 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java
@@ -184,6 +184,14 @@ public interface NiFiClient extends Closeable {
 
     OutputPortClient getOutputPortClientForToken(String token);
 
+    // ----- ProvenanceClient -----
+
+    ProvenanceClient getProvenanceClient();
+
+    ProvenanceClient getProvenanceClientForProxiedEntities(String... 
proxiedEntity);
+
+    ProvenanceClient getProvenanceClientForToken(String token);
+
 
     /**
      * The builder interface that implementations should provide for obtaining 
the client.
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java
new file mode 100644
index 0000000..60774c9
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.client.nifi;
+
+import org.apache.nifi.web.api.entity.LineageEntity;
+import org.apache.nifi.web.api.entity.ProvenanceEntity;
+
+import java.io.IOException;
+
+public interface ProvenanceClient {
+    ProvenanceEntity submitProvenanceQuery(ProvenanceEntity provenanceQuery) 
throws NiFiClientException, IOException;
+
+    ProvenanceEntity getProvenanceQuery(String queryId) throws 
NiFiClientException, IOException;
+
+    ProvenanceEntity deleteProvenanceQuery(String queryId) throws 
NiFiClientException, IOException;
+
+    LineageEntity submitLineageRequest(LineageEntity lineageEntity) throws 
NiFiClientException, IOException;
+
+    LineageEntity getLineageRequest(String lineageRequestId) throws 
NiFiClientException, IOException;
+
+    LineageEntity deleteLineageRequest(String lineageRequestId) throws 
NiFiClientException, IOException;
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java
index fbc71e6..e6b1014 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java
@@ -35,6 +35,7 @@ import 
org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.PoliciesClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient;
@@ -393,6 +394,23 @@ public class JerseyNiFiClient implements NiFiClient {
     }
 
     @Override
+    public ProvenanceClient getProvenanceClient() {
+        return new JerseyProvenanceClient(baseTarget);
+    }
+
+    @Override
+    public ProvenanceClient getProvenanceClientForProxiedEntities(final 
String... proxiedEntity) {
+        final Map<String, String> headers = getHeaders(proxiedEntity);
+        return new JerseyProvenanceClient(baseTarget, headers);
+    }
+
+    @Override
+    public ProvenanceClient getProvenanceClientForToken(final String token) {
+        final Map<String, String> headers = getHeadersWithToken(token);
+        return new JerseyProvenanceClient(baseTarget, headers);
+    }
+
+    @Override
     public void close() {
         if (this.client != null) {
             try {
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java
new file mode 100644
index 0000000..1cbdad3
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.client.nifi.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
+import org.apache.nifi.web.api.entity.LineageEntity;
+import org.apache.nifi.web.api.entity.ProvenanceEntity;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class JerseyProvenanceClient extends AbstractJerseyClient implements 
ProvenanceClient {
+    private final WebTarget provenanceTarget;
+
+    public JerseyProvenanceClient(final WebTarget baseTarget) {
+        this(baseTarget, Collections.emptyMap());
+    }
+
+    public JerseyProvenanceClient(final WebTarget baseTarget, final 
Map<String, String> headers) {
+        super(headers);
+        this.provenanceTarget = baseTarget.path("/provenance");
+    }
+
+    @Override
+    public ProvenanceEntity submitProvenanceQuery(final ProvenanceEntity 
provenanceEntity) throws NiFiClientException, IOException {
+        if (provenanceEntity == null) {
+            throw new IllegalArgumentException("Provenance entity cannot be 
null");
+        }
+
+        return executeAction("Error submitting Provenance Query", () -> 
getRequestBuilder(provenanceTarget).post(
+            Entity.entity(provenanceEntity, MediaType.APPLICATION_JSON_TYPE),
+            ProvenanceEntity.class
+        ));
+    }
+
+    @Override
+    public ProvenanceEntity getProvenanceQuery(final String queryId) throws 
NiFiClientException, IOException {
+        if (StringUtils.isBlank(queryId)) {
+            throw new IllegalArgumentException("Query ID cannot be null");
+        }
+
+        return executeAction("Error retrieving status of Provenance Query", () 
-> {
+            final WebTarget target = 
provenanceTarget.path("/{id}").resolveTemplate("id", queryId);
+            return getRequestBuilder(target).get(ProvenanceEntity.class);
+        });
+    }
+
+    @Override
+    public ProvenanceEntity deleteProvenanceQuery(final String 
provenanceQueryId) throws NiFiClientException, IOException {
+        if (provenanceQueryId == null) {
+            throw new IllegalArgumentException("Provenance Query ID cannot be 
null");
+        }
+
+        return executeAction("Error deleting Provenance Query", () -> {
+            final WebTarget target = provenanceTarget.path("/{id}")
+                .resolveTemplate("id", provenanceQueryId);
+
+            return getRequestBuilder(target).delete(ProvenanceEntity.class);
+        });
+    }
+
+    @Override
+    public LineageEntity submitLineageRequest(final LineageEntity 
lineageEntity) throws NiFiClientException, IOException {
+        if (lineageEntity == null) {
+            throw new IllegalArgumentException("Lineage entity cannot be 
null");
+        }
+
+        return executeAction("Error submitting Provenance Lineage Request", () 
-> getRequestBuilder(provenanceTarget.path("lineage")).post(
+            Entity.entity(lineageEntity, MediaType.APPLICATION_JSON_TYPE),
+            LineageEntity.class
+        ));
+    }
+
+    @Override
+    public LineageEntity getLineageRequest(final String lineageRequestId) 
throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(lineageRequestId)) {
+            throw new IllegalArgumentException("Lineage Request ID cannot be 
null");
+        }
+
+        return executeAction("Error retrieving status of Provenance Lineage 
Request", () -> {
+            final WebTarget target = 
provenanceTarget.path("/lineage/{id}").resolveTemplate("id", lineageRequestId);
+            return getRequestBuilder(target).get(LineageEntity.class);
+        });
+    }
+
+    @Override
+    public LineageEntity deleteLineageRequest(final String lineageRequestId) 
throws NiFiClientException, IOException {
+        if (lineageRequestId == null) {
+            throw new IllegalArgumentException("Lineage Request ID cannot be 
null");
+        }
+
+        return executeAction("Error deleting Provenance Lineage Request", () 
-> {
+            final WebTarget target = provenanceTarget.path("/lineage/{id}")
+                .resolveTemplate("id", lineageRequestId);
+
+            return getRequestBuilder(target).delete(LineageEntity.class);
+        });
+
+    }
+}

Reply via email to