Repository: nifi Updated Branches: refs/heads/master c71409fb5 -> 49c7af03c
NIFI-3039 Provenance Repository - Fix PurgeOldEvent and Rollover Size Limits * Added low water mark for purgeOldEvents() to more quickly purge after a large spike in events. * Adjusted rollover high water mark to avoid overrunning "nifi.provenance.repository.max.storage.size". * Adjusted looping logic in mergeJournals() to use ".firstKey()" instead of ".entrySet().iterator().next()" to avoid unnecessary object creation. Signed-off-by: Mike Moser <[email protected]> This closes #1240 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49c7af03 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49c7af03 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49c7af03 Branch: refs/heads/master Commit: 49c7af03c1ae8a5bd7c727b9a5d59b9d7e2d487e Parents: c71409f Author: Joe Skora <[email protected]> Authored: Thu Nov 17 15:30:52 2016 +0000 Committer: Mike Moser <[email protected]> Committed: Fri Mar 9 21:49:13 2018 +0000 ---------------------------------------------------------------------- .../provenance/PersistentProvenanceRepository.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/49c7af03/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 604bb3f..a582990 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -137,6 +137,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file public static final int MAX_JOURNAL_ROLLOVER_RETRIES = 5; + private static final float PURGE_OLD_EVENTS_HIGH_WATER = 0.9f; + private static final float PURGE_OLD_EVENTS_LOW_WATER = 0.88f; + private static final float ROLLOVER_HIGH_WATER = 0.99f; + private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class); private final long maxPartitionMillis; @@ -945,13 +949,13 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { }; // If we have too much data (at least 90% of our max capacity), start aging it off - if (bytesUsed > configuration.getMaxStorageCapacity() * 0.9) { + if (bytesUsed > configuration.getMaxStorageCapacity() * PURGE_OLD_EVENTS_HIGH_WATER) { Collections.sort(sortedByBasename, sortByBasenameComparator); for (final File file : sortedByBasename) { toPurge.add(file); bytesUsed -= file.length(); - if (bytesUsed < configuration.getMaxStorageCapacity()) { + if (bytesUsed < configuration.getMaxStorageCapacity() * PURGE_OLD_EVENTS_LOW_WATER) { // we've shrunk the repo size down enough to stop break; } @@ -1369,7 +1373,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { int journalFileCount = getJournalCount(); long repoSize = getSize(getLogFiles(), 0L); final int journalCountThreshold = configuration.getJournalCount() * 5; - final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max capacity + final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * ROLLOVER_HIGH_WATER); // check if we need to apply backpressure. // If we have too many journal files, or if the repo becomes too large, backpressure is necessary. Without it, @@ -1759,9 +1763,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { boolean indexEvents = true; while (!recordToReaderMap.isEmpty()) { - final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next(); - final StandardProvenanceEventRecord record = entry.getKey(); - final RecordReader reader = entry.getValue(); + final StandardProvenanceEventRecord record = recordToReaderMap.firstKey(); + final RecordReader reader = recordToReaderMap.get(record); writer.writeRecord(record); final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
