Merge branch 'NIFI-756' into NIFI-744

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e172d663
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e172d663
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e172d663

Branch: refs/heads/NIFI-744
Commit: e172d663dc3f9265854a66832b1d8694e1179a67
Parents: bcdc586 e36f17c
Author: Mark Payne <[email protected]>
Authored: Wed Aug 5 17:51:27 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Aug 6 08:34:33 2015 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 200 ++++++++++++++++++-
 .../nifi/provenance/lucene/DocsReader.java      |  19 +-
 .../nifi/provenance/lucene/IndexSearch.java     |   8 +-
 .../TestPersistentProvenanceRepository.java     |  43 ++--
 4 files changed, 249 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e172d663/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 4d02d18,06c3d83..4476126
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@@ -944,12 -937,171 +947,174 @@@ public class PersistentProvenanceReposi
              updated = idToPathMap.compareAndSet(existingPathMap, newPathMap);
              logger.debug("After expiration, path map: {}", newPathMap);
          }
+ 
+         purgeExpiredIndexes();
+     }
+ 
+     private void purgeExpiredIndexes() throws IOException {
+         // Now that we have potentially removed expired Provenance Event Log 
Files, we can look at
+         // whether or not we can delete any of the indexes. An index can be 
deleted if all of the
+         // data that is associated with that index has already been deleted. 
In order to test this,
+         // we will get the timestamp of the earliest event and then compare 
that to the latest timestamp
+         // that would be indexed by the earliest index. If the event occurred 
after the timestamp of
+         // the latest index, then we can just delete the entire index all 
together.
+ 
+         // find all of the index directories
+         final List<File> indexDirs = getAllIndexDirectories();
+         if (indexDirs.size() < 2) {
+             this.firstEventTimestamp = determineFirstEventTimestamp();
+             return;
+         }
+ 
+         // Indexes are named "index-XXX" where the XXX is the timestamp of 
the earliest event that
+         // could be in the index. Once we have finished with one index, we 
move on to another index,
+         // but we don't move on until we are finished with the previous index.
+         // Therefore, an efficient way to determine the latest timestamp of 
one index is to look at the
+         // timestamp of the next index (these could potentially overlap for 
one millisecond). This is
+         // efficient because we can determine the earliest timestamp of an 
index simply by looking at
+         // the name of the Index's directory.
+         final long latestTimestampOfFirstIndex = 
getIndexTimestamp(indexDirs.get(1));
+ 
+         // Get the timestamp of the first event in the first Provenance Event 
Log File and the ID of the last event
+         // in the event file.
+         final List<File> logFiles = getSortedLogFiles();
+         if (logFiles.isEmpty()) {
+             this.firstEventTimestamp = System.currentTimeMillis();
+             return;
+         }
+ 
+         final File firstLogFile = logFiles.get(0);
+         long earliestEventTime = System.currentTimeMillis();
+         long maxEventId = -1L;
+         try (final RecordReader reader = 
RecordReaders.newRecordReader(firstLogFile, null, Integer.MAX_VALUE)) {
+             final StandardProvenanceEventRecord event = reader.nextRecord();
+             earliestEventTime = event.getEventTime();
+ 
+             try {
+                 maxEventId = reader.getMaxEventId();
+             } catch (final IOException ioe) {
+                 logger.warn("Unable to determine the maximum ID for 
Provenance Event Log File {}; values reported for the number of "
+                     + "events in the Provenance Repository may be 
inaccurate.", firstLogFile);
+             }
+         }
+ 
+         // check if we can delete the index safely.
+         if (latestTimestampOfFirstIndex <= earliestEventTime) {
+             // we can safely delete the first index because the latest event 
in the index is an event
+             // that has already been expired from the repository.
+             final File indexingDirectory = indexDirs.get(0);
+             indexManager.removeIndex(indexingDirectory);
+             indexConfig.removeIndexDirectory(indexingDirectory);
+             deleteDirectory(indexingDirectory);
+ 
+             if (maxEventId > -1L) {
+                 indexConfig.setMinIdIndexed(maxEventId + 1L);
+             }
+         }
+ 
+         this.firstEventTimestamp = earliestEventTime;
+     }
+ 
+     private long determineFirstEventTimestamp() {
+         // Get the timestamp of the first event in the first Provenance Event 
Log File and the ID of the last event
+         // in the event file.
+         final List<File> logFiles = getSortedLogFiles();
+         if (logFiles.isEmpty()) {
+             return 0L;
+         }
+ 
+         for (final File logFile : logFiles) {
+             try (final RecordReader reader = 
RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
+                 final StandardProvenanceEventRecord event = 
reader.nextRecord();
+                 return event.getEventTime();
+             } catch (final IOException ioe) {
+                 logger.warn("Failed to obtain timestamp of first event from 
Provenance Event Log File {}", logFile);
+             }
+         }
+ 
+         return 0L;
+     }
+ 
+     /**
+      * Recursively deletes the given directory. If unable to delete the 
directory, will emit a WARN level
+      * log event and move on.
+      *
+      * @param dir the directory to delete
+      */
+     private void deleteDirectory(final File dir) {
+         if (dir == null || !dir.exists()) {
+             return;
+         }
+ 
+         final File[] children = dir.listFiles();
+         if (children == null) {
+             return;
+         }
+ 
+         for (final File child : children) {
+             if (child.isDirectory()) {
+                 deleteDirectory(child);
+             } else if (!child.delete()) {
+                 logger.warn("Unable to remove index directory {}; this 
directory should be cleaned up manually", child.getAbsolutePath());
+             }
+         }
+ 
+         if (!dir.delete()) {
+             logger.warn("Unable to remove index directory {}; this directory 
should be cleaned up manually", dir);
+         }
+     }
+ 
+ 
+     /**
+      * @return a List of all Index directories, sorted by timestamp of the 
earliest event that could
+      *         be present in the index
+      */
+     private List<File> getAllIndexDirectories() {
+         final List<File> allIndexDirs = new ArrayList<>();
+         for (final File storageDir : configuration.getStorageDirectories()) {
+             final File[] indexDirs = storageDir.listFiles(new 
FilenameFilter() {
+                 @Override
+                 public boolean accept(final File dir, final String name) {
+                     return INDEX_PATTERN.matcher(name).matches();
+                 }
+             });
+ 
+             if (indexDirs != null) {
+                 for (final File indexDir : indexDirs) {
+                     allIndexDirs.add(indexDir);
+                 }
+             }
+         }
+ 
 -        allIndexDirs.sort(new Comparator<File>() {
++        Collections.sort(allIndexDirs, new Comparator<File>() {
+             @Override
+             public int compare(final File o1, final File o2) {
+                 final long time1 = getIndexTimestamp(o1);
+                 final long time2 = getIndexTimestamp(o2);
+                 return Long.compare(time1, time2);
+             }
+         });
+ 
+         return allIndexDirs;
+     }
+ 
+     /**
+      * Takes a File that has a filename "index-" followed by a Long and 
returns the
+      * value of that Long
+      *
+      * @param indexDirectory the index directory to obtain the timestamp for
+      * @return the timestamp associated with the given index
+      */
+     private long getIndexTimestamp(final File indexDirectory) {
+         final String name = indexDirectory.getName();
+         final int dashIndex = name.indexOf("-");
+         return Long.parseLong(name.substring(dashIndex + 1));
      }
  
 -    public void waitForRollover() throws IOException {
 +    /**
 +     * Blocks the calling thread until the repository rolls over. This is 
intended for unit testing.
 +     */
 +    public void waitForRollover() {
          final int count = rolloverCompletions.get();
          while (rolloverCompletions.get() == count) {
              try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e172d663/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------

Reply via email to