Repository: nifi
Updated Branches:
  refs/heads/master c71409fb5 -> 49c7af03c


NIFI-3039 Provenance Repository - Fix PurgeOldEvent and Rollover Size Limits
* Added low water mark for purgeOldEvents() to more quickly purge after a large 
spike in events.
* Adjusted rollover high water mark to avoid overrunning 
"nifi.provenance.repository.max.storage.size".
* Adjusted looping logic in mergeJournals() to use ".firstKey()" instead of 
".entrySet().iterator().next()" to avoid unnecessary object creation.

Signed-off-by: Mike Moser <[email protected]>
This closes #1240


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49c7af03
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49c7af03
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49c7af03

Branch: refs/heads/master
Commit: 49c7af03c1ae8a5bd7c727b9a5d59b9d7e2d487e
Parents: c71409f
Author: Joe Skora <[email protected]>
Authored: Thu Nov 17 15:30:52 2016 +0000
Committer: Mike Moser <[email protected]>
Committed: Fri Mar 9 21:49:13 2018 +0000

----------------------------------------------------------------------
 .../provenance/PersistentProvenanceRepository.java   | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/49c7af03/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 604bb3f..a582990 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -137,6 +137,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
     public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many 
indexing failures we will tolerate before skipping indexing for a prov file
     public static final int MAX_JOURNAL_ROLLOVER_RETRIES = 5;
 
+    private static final float PURGE_OLD_EVENTS_HIGH_WATER = 0.9f;
+    private static final float PURGE_OLD_EVENTS_LOW_WATER = 0.88f;
+    private static final float ROLLOVER_HIGH_WATER = 0.99f;
+
     private static final Logger logger = 
LoggerFactory.getLogger(PersistentProvenanceRepository.class);
 
     private final long maxPartitionMillis;
@@ -945,13 +949,13 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         };
 
         // If we have too much data (at least 90% of our max capacity), start 
aging it off
-        if (bytesUsed > configuration.getMaxStorageCapacity() * 0.9) {
+        if (bytesUsed > configuration.getMaxStorageCapacity() * 
PURGE_OLD_EVENTS_HIGH_WATER) {
             Collections.sort(sortedByBasename, sortByBasenameComparator);
 
             for (final File file : sortedByBasename) {
                 toPurge.add(file);
                 bytesUsed -= file.length();
-                if (bytesUsed < configuration.getMaxStorageCapacity()) {
+                if (bytesUsed < configuration.getMaxStorageCapacity() * 
PURGE_OLD_EVENTS_LOW_WATER) {
                     // we've shrunk the repo size down enough to stop
                     break;
                 }
@@ -1369,7 +1373,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             int journalFileCount = getJournalCount();
             long repoSize = getSize(getLogFiles(), 0L);
             final int journalCountThreshold = configuration.getJournalCount() 
* 5;
-            final long sizeThreshold = (long) 
(configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max 
capacity
+            final long sizeThreshold = (long) 
(configuration.getMaxStorageCapacity() * ROLLOVER_HIGH_WATER);
 
             // check if we need to apply backpressure.
             // If we have too many journal files, or if the repo becomes too 
large, backpressure is necessary. Without it,
@@ -1759,9 +1763,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
 
                         boolean indexEvents = true;
                         while (!recordToReaderMap.isEmpty()) {
-                            final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-                            final StandardProvenanceEventRecord record = 
entry.getKey();
-                            final RecordReader reader = entry.getValue();
+                            final StandardProvenanceEventRecord record = 
recordToReaderMap.firstKey();
+                            final RecordReader reader = 
recordToReaderMap.get(record);
 
                             writer.writeRecord(record);
                             final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();

Reply via email to