Merge branch 'NIFI-756' into NIFI-744
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e172d663 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e172d663 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e172d663 Branch: refs/heads/NIFI-744 Commit: e172d663dc3f9265854a66832b1d8694e1179a67 Parents: bcdc586 e36f17c Author: Mark Payne <[email protected]> Authored: Wed Aug 5 17:51:27 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Aug 6 08:34:33 2015 -0400 ---------------------------------------------------------------------- .../PersistentProvenanceRepository.java | 200 ++++++++++++++++++- .../nifi/provenance/lucene/DocsReader.java | 19 +- .../nifi/provenance/lucene/IndexSearch.java | 8 +- .../TestPersistentProvenanceRepository.java | 43 ++-- 4 files changed, 249 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e172d663/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 4d02d18,06c3d83..4476126 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@@ -944,12 -937,171 +947,174 @@@ public class PersistentProvenanceReposi updated = idToPathMap.compareAndSet(existingPathMap, newPathMap); logger.debug("After expiration, path map: {}", newPathMap); } + + purgeExpiredIndexes(); + } + + private void purgeExpiredIndexes() throws IOException { + // Now that we have potentially removed expired Provenance Event Log Files, we can look at + // whether or not we can delete any of the indexes. An index can be deleted if all of the + // data that is associated with that index has already been deleted. In order to test this, + // we will get the timestamp of the earliest event and then compare that to the latest timestamp + // that would be indexed by the earliest index. If the event occurred after the timestamp of + // the latest index, then we can just delete the entire index all together. + + // find all of the index directories + final List<File> indexDirs = getAllIndexDirectories(); + if (indexDirs.size() < 2) { + this.firstEventTimestamp = determineFirstEventTimestamp(); + return; + } + + // Indexes are named "index-XXX" where the XXX is the timestamp of the earliest event that + // could be in the index. Once we have finished with one index, we move on to another index, + // but we don't move on until we are finished with the previous index. + // Therefore, an efficient way to determine the latest timestamp of one index is to look at the + // timestamp of the next index (these could potentially overlap for one millisecond). This is + // efficient because we can determine the earliest timestamp of an index simply by looking at + // the name of the Index's directory. + final long latestTimestampOfFirstIndex = getIndexTimestamp(indexDirs.get(1)); + + // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event + // in the event file. + final List<File> logFiles = getSortedLogFiles(); + if (logFiles.isEmpty()) { + this.firstEventTimestamp = System.currentTimeMillis(); + return; + } + + final File firstLogFile = logFiles.get(0); + long earliestEventTime = System.currentTimeMillis(); + long maxEventId = -1L; + try (final RecordReader reader = RecordReaders.newRecordReader(firstLogFile, null, Integer.MAX_VALUE)) { + final StandardProvenanceEventRecord event = reader.nextRecord(); + earliestEventTime = event.getEventTime(); + + try { + maxEventId = reader.getMaxEventId(); + } catch (final IOException ioe) { + logger.warn("Unable to determine the maximum ID for Provenance Event Log File {}; values reported for the number of " + + "events in the Provenance Repository may be inaccurate.", firstLogFile); + } + } + + // check if we can delete the index safely. + if (latestTimestampOfFirstIndex <= earliestEventTime) { + // we can safely delete the first index because the latest event in the index is an event + // that has already been expired from the repository. + final File indexingDirectory = indexDirs.get(0); + indexManager.removeIndex(indexingDirectory); + indexConfig.removeIndexDirectory(indexingDirectory); + deleteDirectory(indexingDirectory); + + if (maxEventId > -1L) { + indexConfig.setMinIdIndexed(maxEventId + 1L); + } + } + + this.firstEventTimestamp = earliestEventTime; + } + + private long determineFirstEventTimestamp() { + // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event + // in the event file. + final List<File> logFiles = getSortedLogFiles(); + if (logFiles.isEmpty()) { + return 0L; + } + + for (final File logFile : logFiles) { + try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) { + final StandardProvenanceEventRecord event = reader.nextRecord(); + return event.getEventTime(); + } catch (final IOException ioe) { + logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile); + } + } + + return 0L; + } + + /** + * Recursively deletes the given directory. If unable to delete the directory, will emit a WARN level + * log event and move on. + * + * @param dir the directory to delete + */ + private void deleteDirectory(final File dir) { + if (dir == null || !dir.exists()) { + return; + } + + final File[] children = dir.listFiles(); + if (children == null) { + return; + } + + for (final File child : children) { + if (child.isDirectory()) { + deleteDirectory(child); + } else if (!child.delete()) { + logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", child.getAbsolutePath()); + } + } + + if (!dir.delete()) { + logger.warn("Unable to remove index directory {}; this directory should be cleaned up manually", dir); + } + } + + + /** + * @return a List of all Index directories, sorted by timestamp of the earliest event that could + * be present in the index + */ + private List<File> getAllIndexDirectories() { + final List<File> allIndexDirs = new ArrayList<>(); + for (final File storageDir : configuration.getStorageDirectories()) { + final File[] indexDirs = storageDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String name) { + return INDEX_PATTERN.matcher(name).matches(); + } + }); + + if (indexDirs != null) { + for (final File indexDir : indexDirs) { + allIndexDirs.add(indexDir); + } + } + } + - allIndexDirs.sort(new Comparator<File>() { ++ Collections.sort(allIndexDirs, new Comparator<File>() { + @Override + public int compare(final File o1, final File o2) { + final long time1 = getIndexTimestamp(o1); + final long time2 = getIndexTimestamp(o2); + return Long.compare(time1, time2); + } + }); + + return allIndexDirs; + } + + /** + * Takes a File that has a filename "index-" followed by a Long and returns the + * value of that Long + * + * @param indexDirectory the index directory to obtain the timestamp for + * @return the timestamp associated with the given index + */ + private long getIndexTimestamp(final File indexDirectory) { + final String name = indexDirectory.getName(); + final int dashIndex = name.indexOf("-"); + return Long.parseLong(name.substring(dashIndex + 1)); } - public void waitForRollover() throws IOException { + /** + * Blocks the calling thread until the repository rolls over. This is intended for unit testing. + */ + public void waitForRollover() { final int count = rolloverCompletions.get(); while (rolloverCompletions.get() == count) { try { http://git-wip-us.apache.org/repos/asf/nifi/blob/e172d663/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java ----------------------------------------------------------------------
