Repository: nifi
Updated Branches:
  refs/heads/NIFI-744 e172d663d -> 13b3017d2


NIFI-744: Fixed checkstyle errors


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1f72fa5e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1f72fa5e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1f72fa5e

Branch: refs/heads/NIFI-744
Commit: 1f72fa5ef96eca4e10f38b424a85ce8223367882
Parents: fe25ae0
Author: Mark Payne <[email protected]>
Authored: Tue Aug 4 10:13:02 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Aug 18 09:48:38 2015 -0400

----------------------------------------------------------------------
 .../repository/FileSystemRepository.java        |   2 +-
 .../repository/StandardProcessSession.java      |  20 +-
 .../repository/claim/StandardContentClaim.java  |   6 +-
 .../PersistentProvenanceRepository.java         | 200 ++++++++++++++++++-
 .../nifi/provenance/lucene/DocsReader.java      |  19 +-
 .../nifi/provenance/lucene/IndexSearch.java     |   8 +-
 .../TestPersistentProvenanceRepository.java     |  43 ++--
 7 files changed, 263 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1f72fa5e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 31835dc..0a9acc4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -765,7 +765,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
         // see javadocs for claim.getLength() as to why we do this.
         if (claim.getLength() < 0) {
-                       return Files.size(getPath(claim, true)) - 
claim.getOffset();
+            return Files.size(getPath(claim, true)) - claim.getOffset();
         }
 
         return claim.getLength();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f72fa5e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 14660aa..20dc1a4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1918,11 +1918,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         removeTemporaryClaim(record);
         final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
-               .fromFlowFile(record.getCurrent())
-               .contentClaim(newClaim)
-               .contentClaimOffset(0)
-               .size(writtenHolder.getValue())
-               .build();
+            .fromFlowFile(record.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(0)
+            .size(writtenHolder.getValue())
+            .build();
 
         record.setWorking(newFile);
         return newFile;
@@ -2128,11 +2128,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         removeTemporaryClaim(record);
         final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
-               .fromFlowFile(record.getCurrent())
-               .contentClaim(newClaim)
-               .contentClaimOffset(0L)
-               .size(writtenHolder.getValue())
-               .build();
+            .fromFlowFile(record.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(0L)
+            .size(writtenHolder.getValue())
+            .build();
 
         record.setWorking(newFile);
         return newFile;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f72fa5e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index 62ff276..ea047c7 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -41,11 +41,11 @@ public final class StandardContentClaim implements 
ContentClaim, Comparable<Cont
         this.length = length;
     }
 
-       @Override
-       public int hashCode() {
+    @Override
+    public int hashCode() {
         final int prime = 31;
         int result = 1;
-               result = prime * result;
+        result = prime * result;
         result = prime * result + (int) (length ^ length >>> 32);
         result = prime * result + (int) (offset ^ offset >>> 32);
         result = prime * result + (resourceClaim == null ? 0 : 
resourceClaim.hashCode());

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f72fa5e/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 4d02d18..4476126 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -20,6 +20,7 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -123,6 +124,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     private final AtomicBoolean recoveryFinished = new AtomicBoolean(false);
 
     private volatile boolean closed = false;
+    private volatile long firstEventTimestamp = 0L;
 
     // the following are all protected by the lock
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
@@ -246,7 +248,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     }
                 }, rolloverCheckMillis, rolloverCheckMillis, 
TimeUnit.MILLISECONDS);
 
-                expirationActions.add(new DeleteIndexAction(this, indexConfig, 
indexManager));
                 expirationActions.add(new FileRemovalAction());
 
                 scheduledExecService.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
@@ -265,6 +266,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     }
                 }, 1L, 1L, TimeUnit.MINUTES);
             }
+
+            firstEventTimestamp = determineFirstEventTimestamp();
         } finally {
             writeLock.unlock();
         }
@@ -944,6 +947,168 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             updated = idToPathMap.compareAndSet(existingPathMap, newPathMap);
             logger.debug("After expiration, path map: {}", newPathMap);
         }
+
+        purgeExpiredIndexes();
+    }
+
+    private void purgeExpiredIndexes() throws IOException {
+        // Now that we have potentially removed expired Provenance Event Log 
Files, we can look at
+        // whether or not we can delete any of the indexes. An index can be 
deleted if all of the
+        // data that is associated with that index has already been deleted. 
In order to test this,
+        // we will get the timestamp of the earliest event and then compare 
that to the latest timestamp
+        // that would be indexed by the earliest index. If the event occurred 
after the timestamp of
+        // the latest index, then we can just delete the entire index all 
together.
+
+        // find all of the index directories
+        final List<File> indexDirs = getAllIndexDirectories();
+        if (indexDirs.size() < 2) {
+            this.firstEventTimestamp = determineFirstEventTimestamp();
+            return;
+        }
+
+        // Indexes are named "index-XXX" where the XXX is the timestamp of the 
earliest event that
+        // could be in the index. Once we have finished with one index, we 
move on to another index,
+        // but we don't move on until we are finished with the previous index.
+        // Therefore, an efficient way to determine the latest timestamp of 
one index is to look at the
+        // timestamp of the next index (these could potentially overlap for 
one millisecond). This is
+        // efficient because we can determine the earliest timestamp of an 
index simply by looking at
+        // the name of the Index's directory.
+        final long latestTimestampOfFirstIndex = 
getIndexTimestamp(indexDirs.get(1));
+
+        // Get the timestamp of the first event in the first Provenance Event 
Log File and the ID of the last event
+        // in the event file.
+        final List<File> logFiles = getSortedLogFiles();
+        if (logFiles.isEmpty()) {
+            this.firstEventTimestamp = System.currentTimeMillis();
+            return;
+        }
+
+        final File firstLogFile = logFiles.get(0);
+        long earliestEventTime = System.currentTimeMillis();
+        long maxEventId = -1L;
+        try (final RecordReader reader = 
RecordReaders.newRecordReader(firstLogFile, null, Integer.MAX_VALUE)) {
+            final StandardProvenanceEventRecord event = reader.nextRecord();
+            earliestEventTime = event.getEventTime();
+
+            try {
+                maxEventId = reader.getMaxEventId();
+            } catch (final IOException ioe) {
+                logger.warn("Unable to determine the maximum ID for Provenance 
Event Log File {}; values reported for the number of "
+                    + "events in the Provenance Repository may be 
inaccurate.", firstLogFile);
+            }
+        }
+
+        // check if we can delete the index safely.
+        if (latestTimestampOfFirstIndex <= earliestEventTime) {
+            // we can safely delete the first index because the latest event 
in the index is an event
+            // that has already been expired from the repository.
+            final File indexingDirectory = indexDirs.get(0);
+            indexManager.removeIndex(indexingDirectory);
+            indexConfig.removeIndexDirectory(indexingDirectory);
+            deleteDirectory(indexingDirectory);
+
+            if (maxEventId > -1L) {
+                indexConfig.setMinIdIndexed(maxEventId + 1L);
+            }
+        }
+
+        this.firstEventTimestamp = earliestEventTime;
+    }
+
+    private long determineFirstEventTimestamp() {
+        // Get the timestamp of the first event in the first Provenance Event 
Log File and the ID of the last event
+        // in the event file.
+        final List<File> logFiles = getSortedLogFiles();
+        if (logFiles.isEmpty()) {
+            return 0L;
+        }
+
+        for (final File logFile : logFiles) {
+            try (final RecordReader reader = 
RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
+                final StandardProvenanceEventRecord event = 
reader.nextRecord();
+                return event.getEventTime();
+            } catch (final IOException ioe) {
+                logger.warn("Failed to obtain timestamp of first event from 
Provenance Event Log File {}", logFile);
+            }
+        }
+
+        return 0L;
+    }
+
+    /**
+     * Recursively deletes the given directory. If unable to delete the 
directory, will emit a WARN level
+     * log event and move on.
+     *
+     * @param dir the directory to delete
+     */
+    private void deleteDirectory(final File dir) {
+        if (dir == null || !dir.exists()) {
+            return;
+        }
+
+        final File[] children = dir.listFiles();
+        if (children == null) {
+            return;
+        }
+
+        for (final File child : children) {
+            if (child.isDirectory()) {
+                deleteDirectory(child);
+            } else if (!child.delete()) {
+                logger.warn("Unable to remove index directory {}; this 
directory should be cleaned up manually", child.getAbsolutePath());
+            }
+        }
+
+        if (!dir.delete()) {
+            logger.warn("Unable to remove index directory {}; this directory 
should be cleaned up manually", dir);
+        }
+    }
+
+
+    /**
+     * @return a List of all Index directories, sorted by timestamp of the 
earliest event that could
+     *         be present in the index
+     */
+    private List<File> getAllIndexDirectories() {
+        final List<File> allIndexDirs = new ArrayList<>();
+        for (final File storageDir : configuration.getStorageDirectories()) {
+            final File[] indexDirs = storageDir.listFiles(new FilenameFilter() 
{
+                @Override
+                public boolean accept(final File dir, final String name) {
+                    return INDEX_PATTERN.matcher(name).matches();
+                }
+            });
+
+            if (indexDirs != null) {
+                for (final File indexDir : indexDirs) {
+                    allIndexDirs.add(indexDir);
+                }
+            }
+        }
+
+        Collections.sort(allIndexDirs, new Comparator<File>() {
+            @Override
+            public int compare(final File o1, final File o2) {
+                final long time1 = getIndexTimestamp(o1);
+                final long time2 = getIndexTimestamp(o2);
+                return Long.compare(time1, time2);
+            }
+        });
+
+        return allIndexDirs;
+    }
+
+    /**
+     * Takes a File that has a filename "index-" followed by a Long and 
returns the
+     * value of that Long
+     *
+     * @param indexDirectory the index directory to obtain the timestamp for
+     * @return the timestamp associated with the given index
+     */
+    private long getIndexTimestamp(final File indexDirectory) {
+        final String name = indexDirectory.getName();
+        final int dashIndex = name.indexOf("-");
+        return Long.parseLong(name.substring(dashIndex + 1));
     }
 
     /**
@@ -2011,6 +2176,37 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return false;
     }
 
+    /**
+     * @return a List of all Provenance Event Log Files, sorted in ascending 
order by the first Event ID in each file
+     */
+    private List<File> getSortedLogFiles() {
+        final List<Path> paths = new ArrayList<>(getAllLogFiles());
+        Collections.sort(paths, new Comparator<Path>() {
+            @Override
+            public int compare(final Path o1, final Path o2) {
+                return Long.compare(getFirstEventId(o1.toFile()), 
getFirstEventId(o2.toFile()));
+            }
+        });
+
+        final List<File> files = new ArrayList<>(paths.size());
+        for (final Path path : paths) {
+            files.add(path.toFile());
+        }
+        return files;
+    }
+
+    /**
+     * Returns the Event ID of the first event in the given Provenance Event 
Log File.
+     *
+     * @param logFile the log file from which to obtain the first Event ID
+     * @return the ID of the first event in the given log file
+     */
+    private long getFirstEventId(final File logFile) {
+        final String name = logFile.getName();
+        final int dotIndex = name.indexOf(".");
+        return Long.parseLong(name.substring(0, dotIndex));
+    }
+
     public Collection<Path> getAllLogFiles() {
         final SortedMap<Long, Path> map = idToPathMap.get();
         return map == null ? new ArrayList<Path>() : map.values();
@@ -2102,7 +2298,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         public void run() {
             try {
                 final IndexSearch search = new 
IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, 
maxAttributeChars);
-                final StandardQueryResult queryResult = search.search(query, 
retrievalCount);
+                final StandardQueryResult queryResult = search.search(query, 
retrievalCount, firstEventTimestamp);
                 submission.getResult().update(queryResult.getMatchingEvents(), 
queryResult.getTotalHitCount());
                 if (queryResult.isFinished()) {
                     logger.info("Successfully executed Query[{}] against Index 
{}; Search took {} milliseconds; Total Hits = {}",

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f72fa5e/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index eef4628..c00e0bf 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -30,17 +30,17 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,6 +108,7 @@ public class DocsReader {
     }
 
 
+    @SuppressWarnings("resource")
     public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
Collection<Path> allProvenanceLogFiles,
         final AtomicInteger retrievalCount, final int maxResults, final int 
maxAttributeChars) throws IOException {
         if (retrievalCount.get() >= maxResults) {
@@ -124,6 +125,7 @@ public class DocsReader {
         int logFileCount = 0;
 
         final Set<String> storageFilesToSkip = new HashSet<>();
+        int eventsReadThisFile = 0;
 
         try {
             for (final Document d : docs) {
@@ -135,6 +137,7 @@ public class DocsReader {
                 try {
                     if (reader != null && 
storageFilename.equals(lastStorageFilename)) {
                         matchingRecords.add(getRecord(d, reader));
+                        eventsReadThisFile++;
 
                         if ( retrievalCount.incrementAndGet() >= maxResults ) {
                             break;
@@ -162,8 +165,13 @@ public class DocsReader {
 
                         for (final File file : potentialFiles) {
                             try {
+                                if (reader != null) {
+                                    logger.debug("Read {} records from 
previous file", eventsReadThisFile);
+                                }
+
                                 reader = RecordReaders.newRecordReader(file, 
allProvenanceLogFiles, maxAttributeChars);
                                 matchingRecords.add(getRecord(d, reader));
+                                eventsReadThisFile = 1;
 
                                 if ( retrievalCount.incrementAndGet() >= 
maxResults ) {
                                     break;
@@ -183,6 +191,7 @@ public class DocsReader {
             }
         }
 
+        logger.debug("Read {} records from previous file", eventsReadThisFile);
         final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
         logger.debug("Took {} ms to read {} events from {} prov log files", 
millis, matchingRecords.size(), logFileCount);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f72fa5e/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index c9bb238..7fcd8ab 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -48,7 +48,7 @@ public class IndexSearch {
         this.maxAttributeChars = maxAttributeChars;
     }
 
-    public StandardQueryResult search(final 
org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger 
retrievedCount) throws IOException {
+    public StandardQueryResult search(final 
org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger 
retrievedCount, final long firstEventTimestamp) throws IOException {
         if (!indexDirectory.exists() && !indexDirectory.mkdirs()) {
             throw new IOException("Unable to create Indexing Directory " + 
indexDirectory);
         }
@@ -59,6 +59,12 @@ public class IndexSearch {
         final StandardQueryResult sqr = new 
StandardQueryResult(provenanceQuery, 1);
         final Set<ProvenanceEventRecord> matchingRecords;
 
+        // we need to set the start date because if we do not, the first index 
may still have events that have aged off from
+        // the repository, and we don't want those events to count toward the 
total number of matches.
+        if (provenanceQuery.getStartDate() == null || 
provenanceQuery.getStartDate().getTime() < firstEventTimestamp) {
+            provenanceQuery.setStartDate(new Date(firstEventTimestamp));
+        }
+
         if (provenanceQuery.getEndDate() == null) {
             provenanceQuery.setEndDate(new Date());
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f72fa5e/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 713180f..e77710b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -934,11 +934,12 @@ public class TestPersistentProvenanceRepository {
     @Test
     public void testIndexDirectoryRemoved() throws InterruptedException, 
IOException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
-        config.setMaxRecordLife(3, TimeUnit.SECONDS);
+        config.setMaxRecordLife(5, TimeUnit.MINUTES);
         config.setMaxStorageCapacity(1024L * 1024L);
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setMaxEventFileCapacity(1024L * 1024L);
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
+        config.setDesiredIndexSize(10);  // force new index to be created for 
each rollover
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
         repo.initialize(getEventReporter());
@@ -960,11 +961,28 @@ public class TestPersistentProvenanceRepository {
         for (int i = 0; i < 10; i++) {
             attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
             builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(10L);  // make sure the events are destroyed 
when we call purge
             repo.registerEvent(builder.build());
         }
 
+        // wait for indexing to happen
         repo.waitForRollover();
 
+        Thread.sleep(2000L);
+
+        // add more records so that we will create a new index
+        final long secondBatchStartTime = System.currentTimeMillis();
+        for (int i = 0; i < 10; i++) {
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000001" + i);
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(System.currentTimeMillis());
+            repo.registerEvent(builder.build());
+        }
+
+        // wait for indexing to happen
+        repo.waitForRollover();
+
+        // verify we get the results expected
         final Query query = new Query(UUID.randomUUID().toString());
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, 
"file-*"));
         
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, 
"12?4"));
@@ -972,31 +990,30 @@ public class TestPersistentProvenanceRepository {
         query.setMaxResults(100);
 
         final QueryResult result = repo.queryEvents(query);
-        assertEquals(10, result.getMatchingEvents().size());
-
-        Thread.sleep(2000L);
+        assertEquals(20, result.getMatchingEvents().size());
 
-        // Ensure index directory exists
+        // Ensure index directories exists
         final FileFilter indexFileFilter = new FileFilter() {
             @Override
             public boolean accept(File pathname) {
                 return pathname.getName().startsWith("index");
             }
         };
-        File[] storageDirFiles = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
-        assertEquals(1, storageDirFiles.length);
+        File[] indexDirs = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
+        assertEquals(2, indexDirs.length);
 
-        config.setMaxStorageCapacity(100L);
-        config.setMaxRecordLife(500, TimeUnit.MILLISECONDS);
+        // expire old events and indexes
+        final long timeSinceSecondBatch = System.currentTimeMillis() - 
secondBatchStartTime;
+        config.setMaxRecordLife(timeSinceSecondBatch + 1000L, 
TimeUnit.MILLISECONDS);
         repo.purgeOldEvents();
         Thread.sleep(2000L);
 
         final QueryResult newRecordSet = repo.queryEvents(query);
-        assertTrue(newRecordSet.getMatchingEvents().isEmpty());
+        assertEquals(10, newRecordSet.getMatchingEvents().size());
 
-        // Ensure index directory is gone
-        storageDirFiles = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
-        assertEquals(0, storageDirFiles.length);
+        // Ensure that one index directory is gone
+        indexDirs = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
+        assertEquals(1, indexDirs.length);
     }
 
 

Reply via email to