Repository: nifi
Updated Branches:
  refs/heads/master bc5237593 -> 16348b071


NIFI-2452: Ensure that we do not close Index Readers that are still in use


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e9b87dd7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e9b87dd7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e9b87dd7

Branch: refs/heads/master
Commit: e9b87dd73436b1659b1fddcc400e7248bc00f1ee
Parents: bc52375
Author: Mark Payne <[email protected]>
Authored: Mon Aug 1 14:51:02 2016 -0400
Committer: joewitt <[email protected]>
Committed: Wed Aug 3 07:54:10 2016 -0700

----------------------------------------------------------------------
 .../nifi/provenance/IndexConfiguration.java     |   3 +-
 .../PersistentProvenanceRepository.java         |  20 ++-
 .../nifi/provenance/lucene/IndexManager.java    |  55 +++++--
 .../nifi/provenance/lucene/IndexSearch.java     |   3 +-
 .../nifi/provenance/lucene/LineageQuery.java    |   1 +
 .../TestPersistentProvenanceRepository.java     | 163 +++++++++++++++++++
 6 files changed, 223 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e9b87dd7/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index 4e80811..af7bff5 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -212,13 +212,14 @@ public class IndexConfiguration {
         final List<File> dirs = new ArrayList<>();
         lock.lock();
         try {
+            // Sort directories so that we return the newest index first
             final List<File> sortedIndexDirectories = getIndexDirectories();
             Collections.sort(sortedIndexDirectories, new Comparator<File>() {
                 @Override
                 public int compare(final File o1, final File o2) {
                     final long epochTimestamp1 = getIndexStartTime(o1);
                     final long epochTimestamp2 = getIndexStartTime(o2);
-                    return Long.compare(epochTimestamp1, epochTimestamp2);
+                    return Long.compare(epochTimestamp2, epochTimestamp1);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e9b87dd7/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index aee8277..87b617f 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -218,6 +218,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         rolloverExecutor = 
Executors.newScheduledThreadPool(numRolloverThreads, new 
NamedThreadFactory("Provenance Repository Rollover Thread"));
     }
 
+    protected IndexManager getIndexManager() {
+        return indexManager;
+    }
+
     @Override
     public void initialize(final EventReporter eventReporter, final Authorizer 
authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws 
IOException {
         writeLock.lock();
@@ -692,7 +696,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             rolloverExecutor.shutdownNow();
             queryExecService.shutdownNow();
 
-            indexManager.close();
+            getIndexManager().close();
 
             if ( writers != null ) {
                 for (final RecordWriter writer : writers) {
@@ -1054,7 +1058,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             // we can safely delete the first index because the latest event 
in the index is an event
             // that has already been expired from the repository.
             final File indexingDirectory = indexDirs.get(0);
-            indexManager.removeIndex(indexingDirectory);
+            getIndexManager().removeIndex(indexingDirectory);
             indexConfig.removeIndexDirectory(indexingDirectory);
             deleteDirectory(indexingDirectory);
 
@@ -1522,7 +1526,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                 logger.warn("Merged Journal File {} already exists; however, 
all partial journal files also exist "
                         + "so assuming that the merge did not finish. 
Repeating procedure in order to ensure consistency.");
 
-                final DeleteIndexAction deleteAction = new 
DeleteIndexAction(this, indexConfig, indexManager);
+                final DeleteIndexAction deleteAction = new 
DeleteIndexAction(this, indexConfig, getIndexManager());
                 try {
                     deleteAction.execute(suggestedMergeFile);
                 } catch (final Exception e) {
@@ -1658,7 +1662,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                 final AtomicBoolean finishedAdding = new AtomicBoolean(false);
                 final List<Future<?>> futures = new ArrayList<>();
 
-                final IndexWriter indexWriter = 
indexManager.borrowIndexWriter(indexingDirectory);
+                final IndexWriter indexWriter = 
getIndexManager().borrowIndexWriter(indexingDirectory);
                 try {
                     final ExecutorService exec = 
Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new 
ThreadFactory() {
                         @Override
@@ -1781,7 +1785,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                         }
                     }
                 } finally {
-                    indexManager.returnIndexWriter(indexingDirectory, 
indexWriter);
+                    getIndexManager().returnIndexWriter(indexingDirectory, 
indexWriter);
                 }
 
                 indexConfig.setMaxIdIndexed(maxId);
@@ -1984,7 +1988,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
      * @return an Iterator of ProvenanceEventRecord that match the query
      * @throws IOException if unable to perform the query
      */
-    public Iterator<ProvenanceEventRecord> queryLucene(final 
org.apache.lucene.search.Query luceneQuery) throws IOException {
+    Iterator<ProvenanceEventRecord> queryLucene(final 
org.apache.lucene.search.Query luceneQuery) throws IOException {
         final List<File> indexFiles = indexConfig.getIndexDirectories();
 
         final AtomicLong hits = new AtomicLong(0L);
@@ -2471,7 +2475,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         @Override
         public void run() {
             try {
-                final IndexSearch search = new 
IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, 
maxAttributeChars);
+                final IndexSearch search = new 
IndexSearch(PersistentProvenanceRepository.this, indexDir, getIndexManager(), 
maxAttributeChars);
                 final StandardQueryResult queryResult = search.search(query, 
user, retrievalCount, firstEventTimestamp);
                 submission.getResult().update(queryResult.getMatchingEvents(), 
queryResult.getTotalHitCount());
             } catch (final Throwable t) {
@@ -2511,7 +2515,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
 
             try {
                 final Set<ProvenanceEventRecord> matchingRecords = 
LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
-                    indexManager, indexDir, null, flowFileUuids, 
maxAttributeChars);
+                    getIndexManager(), indexDir, null, flowFileUuids, 
maxAttributeChars);
 
                 final StandardLineageResult result = submission.getResult();
                 
result.update(replaceUnauthorizedWithPlaceholders(matchingRecords, user));

http://git-wip-us.apache.org/repos/asf/nifi/blob/e9b87dd7/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 57d0d78..07cd190 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -86,7 +87,7 @@ public class IndexManager implements Closeable {
 
     public IndexWriter borrowIndexWriter(final File indexingDirectory) throws 
IOException {
         final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.debug("Borrowing index writer for {}", indexingDirectory);
+        logger.trace("Borrowing index writer for {}", indexingDirectory);
 
         lock.lock();
         try {
@@ -124,6 +125,7 @@ public class IndexManager implements Closeable {
                 final List<ActiveIndexSearcher> searchers = 
activeSearchers.get(absoluteFile);
                 if ( searchers != null ) {
                     for (final ActiveIndexSearcher activeSearcher : searchers) 
{
+                        logger.debug("Poisoning {} because it is searching {}, 
which is getting updated", activeSearcher, indexingDirectory);
                         activeSearcher.poison();
                     }
                 }
@@ -141,7 +143,7 @@ public class IndexManager implements Closeable {
 
     public void returnIndexWriter(final File indexingDirectory, final 
IndexWriter writer) {
         final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.debug("Returning Index Writer for {} to IndexManager", 
indexingDirectory);
+        logger.trace("Returning Index Writer for {} to IndexManager", 
indexingDirectory);
 
         lock.lock();
         try {
@@ -154,7 +156,7 @@ public class IndexManager implements Closeable {
                     writer.close();
                 } else if ( count.getCount() <= 1 ) {
                     // we are finished with this writer.
-                    logger.debug("Closing Index Writer for {}", 
indexingDirectory);
+                    logger.debug("Decrementing count for Index Writer for {} 
to {}; Closing writer", indexingDirectory, count.getCount() - 1);
                     count.close();
                 } else {
                     // decrement the count.
@@ -175,7 +177,7 @@ public class IndexManager implements Closeable {
 
     public IndexSearcher borrowIndexSearcher(final File indexDir) throws 
IOException {
         final File absoluteFile = indexDir.getAbsoluteFile();
-        logger.debug("Borrowing index searcher for {}", indexDir);
+        logger.trace("Borrowing index searcher for {}", indexDir);
 
         lock.lock();
         try {
@@ -210,7 +212,8 @@ public class IndexManager implements Closeable {
                                 continue;
                             }
 
-                            logger.debug("Providing previously cached index 
searcher for {}", indexDir);
+                            final int referenceCount = 
searcher.incrementReferenceCount();
+                            logger.debug("Providing previously cached index 
searcher for {} and incrementing Reference Count to {}", indexDir, 
referenceCount);
                             return searcher.getSearcher();
                         }
                     }
@@ -219,7 +222,9 @@ public class IndexManager implements Closeable {
                     // from the cache so that we don't try to use them again 
later.
                     for ( final ActiveIndexSearcher searcher : expired ) {
                         try {
+                            logger.debug("Closing {}", searcher);
                             searcher.close();
+                            logger.trace("Closed {}", searcher);
                         } catch (final Exception e) {
                             logger.debug("Failed to close 'expired' 
IndexSearcher {}", searcher);
                         }
@@ -239,11 +244,14 @@ public class IndexManager implements Closeable {
                     final IndexSearcher searcher = new 
IndexSearcher(directoryReader);
 
                     // we want to cache the searcher that we create, since 
it's just a reader.
-                    final ActiveIndexSearcher cached = new 
ActiveIndexSearcher(searcher, directoryReader, directory, true);
+                    final ActiveIndexSearcher cached = new 
ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
                     currentlyCached.add(cached);
 
                     return cached.getSearcher();
                 } catch (final IOException e) {
+                    logger.error("Failed to create Index Searcher for {} due 
to {}", absoluteFile, e.toString());
+                    logger.error("", e);
+
                     try {
                         directory.close();
                     } catch (final IOException ioe) {
@@ -269,7 +277,7 @@ public class IndexManager implements Closeable {
 
                 // we don't want to cache this searcher because it's based on 
a writer, so we want to get
                 // new values the next time that we search.
-                final ActiveIndexSearcher activeSearcher = new 
ActiveIndexSearcher(searcher, directoryReader, null, false);
+                final ActiveIndexSearcher activeSearcher = new 
ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
 
                 currentlyCached.add(activeSearcher);
                 return activeSearcher.getSearcher();
@@ -282,7 +290,7 @@ public class IndexManager implements Closeable {
 
     public void returnIndexSearcher(final File indexDirectory, final 
IndexSearcher searcher) {
         final File absoluteFile = indexDirectory.getAbsoluteFile();
-        logger.debug("Returning index searcher for {} to IndexManager", 
indexDirectory);
+        logger.trace("Returning index searcher for {} to IndexManager", 
indexDirectory);
 
         lock.lock();
         try {
@@ -318,7 +326,8 @@ public class IndexManager implements Closeable {
                             return;
                         } else {
                             // the searcher is cached. Just leave it open.
-                            logger.debug("Index searcher for {} is cached; 
leaving open", indexDirectory);
+                            final int refCount = 
activeSearcher.decrementReferenceCount();
+                            logger.debug("Index searcher for {} is cached; 
leaving open with reference count of {}", indexDirectory, refCount);
                             return;
                         }
                     } else {
@@ -439,14 +448,17 @@ public class IndexManager implements Closeable {
     private static class ActiveIndexSearcher implements Closeable {
         private final IndexSearcher searcher;
         private final DirectoryReader directoryReader;
+        private final File indexDirectory;
         private final Directory directory;
         private final boolean cache;
-        private boolean poisoned = false;
+        private final AtomicInteger referenceCount = new AtomicInteger(1);
+        private volatile boolean poisoned = false;
 
-        public ActiveIndexSearcher(final IndexSearcher searcher, final 
DirectoryReader directoryReader,
+        public ActiveIndexSearcher(final IndexSearcher searcher, final File 
indexDirectory, final DirectoryReader directoryReader,
                 final Directory directory, final boolean cache) {
             this.searcher = searcher;
             this.directoryReader = directoryReader;
+            this.indexDirectory = indexDirectory;
             this.directory = directory;
             this.cache = cache;
         }
@@ -467,9 +479,28 @@ public class IndexManager implements Closeable {
             this.poisoned = true;
         }
 
+        public int incrementReferenceCount() {
+            return referenceCount.incrementAndGet();
+        }
+
+        public int decrementReferenceCount() {
+            return referenceCount.decrementAndGet();
+        }
+
         @Override
         public void close() throws IOException {
-            IndexManager.close(directoryReader, directory);
+            final int updatedRefCount = referenceCount.decrementAndGet();
+            if (updatedRefCount <= 0) {
+                logger.debug("Decremented Reference Count for {} to {}; 
closing underlying directory reader", this, updatedRefCount);
+                IndexManager.close(directoryReader, directory);
+            } else {
+                logger.debug("Decremented Reference Count for {} to {}; 
leaving underlying directory reader open", this, updatedRefCount);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "ActiveIndexSearcher[directory=" + indexDirectory + ", 
cached=" + cache + ", poisoned=" + poisoned + "]";
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e9b87dd7/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index 00e5f38..8d7df8b 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -92,11 +92,12 @@ public class IndexSearch {
             final long searchStartNanos = System.nanoTime();
             final long openSearcherNanos = searchStartNanos - start;
 
+            logger.debug("Searching {} for {}", this, provenanceQuery);
             final TopDocs topDocs = searcher.search(luceneQuery, 
provenanceQuery.getMaxResults());
             final long finishSearch = System.nanoTime();
             final long searchNanos = finishSearch - searchStartNanos;
 
-            logger.debug("Searching {} took {} millis; opening searcher took 
{} millis", this,
+            logger.debug("Searching {} for {} took {} millis; opening searcher 
took {} millis", this, provenanceQuery,
                     TimeUnit.NANOSECONDS.toMillis(searchNanos), 
TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
 
             if (topDocs.totalHits == 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e9b87dd7/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index 2706082..1b13504 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -74,6 +74,7 @@ public class LineageQuery {
                 }
 
                 final long searchStart = System.nanoTime();
+                logger.debug("Searching {} for {}", indexDirectory, 
flowFileIdQuery);
                 final TopDocs uuidQueryTopDocs = 
searcher.search(flowFileIdQuery, MAX_QUERY_RESULTS);
                 final long searchEnd = System.nanoTime();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e9b87dd7/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index d7738e7..b78dfcd 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -36,6 +36,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
 import org.apache.nifi.provenance.lineage.LineageNode;
 import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lucene.IndexManager;
 import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
@@ -59,6 +60,8 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileFilter;
@@ -71,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -481,6 +485,165 @@ public class TestPersistentProvenanceRepository {
         assertTrue(newRecordSet.getMatchingEvents().isEmpty());
     }
 
+    // TODO: Switch to 10,000.
+    @Test(timeout = 1000000)
+    public void testModifyIndexWhileSearching() throws IOException, 
InterruptedException, ParseException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(30, TimeUnit.SECONDS);
+        config.setMaxStorageCapacity(1024L * 1024L * 10);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L * 10);
+        config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
+
+        final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+            private IndexManager wrappedManager = null;
+
+            // Create an IndexManager that adds a delay before returning the 
Index Searcher.
+            @Override
+            protected synchronized IndexManager getIndexManager() {
+                if (wrappedManager == null) {
+                    final IndexManager mgr = super.getIndexManager();
+                    final Logger logger = 
LoggerFactory.getLogger("IndexManager");
+
+                    wrappedManager = new IndexManager() {
+                        final AtomicInteger indexSearcherCount = new 
AtomicInteger(0);
+
+                        @Override
+                        public IndexSearcher borrowIndexSearcher(File 
indexDir) throws IOException {
+                            final IndexSearcher searcher = 
mgr.borrowIndexSearcher(indexDir);
+                            final int idx = 
indexSearcherCount.incrementAndGet();
+                            obtainIndexSearcherLatch.countDown();
+
+                            // The first searcher should sleep for 3 seconds. 
The second searcher should
+                            // sleep for 5 seconds. This allows us to have two 
threads each obtain a Searcher
+                            // and then have one of them finish searching and 
close the searcher if it's poisoned while the
+                            // second thread is still holding the searcher
+                            try {
+                                if (idx == 1) {
+                                    Thread.sleep(3000L);
+                                } else {
+                                    Thread.sleep(5000L);
+                                }
+                            } catch (InterruptedException e) {
+                                throw new IOException("Interrupted", e);
+                            }
+
+                            logger.info("Releasing index searcher");
+                            return searcher;
+                        }
+
+                        @Override
+                        public IndexWriter borrowIndexWriter(File 
indexingDirectory) throws IOException {
+                            return mgr.borrowIndexWriter(indexingDirectory);
+                        }
+
+                        @Override
+                        public void close() throws IOException {
+                            mgr.close();
+                        }
+
+                        @Override
+                        public void removeIndex(File indexDirectory) {
+                            mgr.removeIndex(indexDirectory);
+                        }
+
+                        @Override
+                        public void returnIndexSearcher(File indexDirectory, 
IndexSearcher searcher) {
+                            mgr.returnIndexSearcher(indexDirectory, searcher);
+                        }
+
+                        @Override
+                        public void returnIndexWriter(File indexingDirectory, 
IndexWriter writer) {
+                            mgr.returnIndexWriter(indexingDirectory, writer);
+                        }
+                    };
+                }
+
+                return wrappedManager;
+            }
+        };
+
+        repo.initialize(getEventReporter(), null, null);
+
+        final String uuid = "10000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", uuid);
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            repo.registerEvent(builder.build());
+        }
+
+        repo.waitForRollover();
+
+        // Perform a query. This will ensure that an IndexSearcher is created 
and cached.
+        final Query query = new Query(UUID.randomUUID().toString());
+        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, 
"file-*"));
+        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, 
"12?4"));
+        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, 
"nifi://*"));
+        query.setMaxResults(100);
+
+        // Run a query in a background thread. When this thread goes to obtain 
the IndexSearcher, it will have a 5 second delay.
+        // That delay will occur as the main thread is updating the index. 
This should result in the search creating a new Index Reader
+        // that can properly query the index.
+        final int numThreads = 2;
+        final CountDownLatch performSearchLatch = new 
CountDownLatch(numThreads);
+        final Runnable searchRunnable = new Runnable() {
+            @Override
+            public void run() {
+                QueryResult result;
+                try {
+                    result = repo.queryEvents(query, createUser());
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                    return;
+                }
+
+                System.out.println("Finished search: " + result);
+                performSearchLatch.countDown();
+            }
+        };
+
+        // Kick off the searcher threads
+        for (int i = 0; i < numThreads; i++) {
+            final Thread searchThread = new Thread(searchRunnable);
+            searchThread.start();
+        }
+
+        // Wait until we've obtained the Index Searchers before modifying the 
index.
+        obtainIndexSearcherLatch.await();
+
+        // add more events to the repo
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            repo.registerEvent(builder.build());
+        }
+
+        // Force a rollover to occur. This will modify the index.
+        repo.rolloverWithLock(true);
+
+        // Wait for the repository to roll over.
+        repo.waitForRollover();
+
+        // Wait for the searches to complete.
+        performSearchLatch.await();
+    }
+
     @Test
     public void 
testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws 
IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();

Reply via email to