http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
new file mode 100644
index 0000000..3943504
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexManager implements Closeable {
+       private static final Logger logger = 
LoggerFactory.getLogger(IndexManager.class);
+       
+       private final Lock lock = new ReentrantLock();
+       private final Map<File, IndexWriterCount> writerCounts = new 
HashMap<>();
+       private final Map<File, List<ActiveIndexSearcher>> activeSearchers = 
new HashMap<>();
+       
+       
+       public void removeIndex(final File indexDirectory) {
+               final File absoluteFile = indexDirectory.getAbsoluteFile();
+               logger.info("Removing index {}", indexDirectory);
+               
+               lock.lock();
+               try {
+                       final IndexWriterCount count = 
writerCounts.remove(absoluteFile);
+                       if ( count != null ) {
+                               try {
+                                       count.close();
+                               } catch (final IOException ioe) {
+                                       logger.warn("Failed to close Index 
Writer {} for {}", count.getWriter(), absoluteFile);
+                                       if ( logger.isDebugEnabled() ) {
+                                               logger.warn("", ioe);
+                                       }
+                               }
+                       }
+                       
+                       for ( final List<ActiveIndexSearcher> searcherList : 
activeSearchers.values() ) {
+                               for ( final ActiveIndexSearcher searcher : 
searcherList ) {
+                                       try {
+                                               searcher.close();
+                                       } catch (final IOException ioe) {
+                                               logger.warn("Failed to close 
Index Searcher {} for {} due to {}", 
+                                                               
searcher.getSearcher(), absoluteFile, ioe);
+                                               if ( logger.isDebugEnabled() ) {
+                                                       logger.warn("", ioe);
+                                               }
+                                       }
+                               }
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+       
+       public IndexWriter borrowIndexWriter(final File indexingDirectory) 
throws IOException {
+               final File absoluteFile = indexingDirectory.getAbsoluteFile();
+               logger.debug("Borrowing index writer for {}", 
indexingDirectory);
+               
+               lock.lock();
+               try {
+                       IndexWriterCount writerCount = 
writerCounts.remove(absoluteFile);
+                       if ( writerCount == null ) {
+                               final List<Closeable> closeables = new 
ArrayList<>();
+                final Directory directory = 
FSDirectory.open(indexingDirectory);
+                closeables.add(directory);
+                
+                try {
+                       final Analyzer analyzer = new StandardAnalyzer();
+                       closeables.add(analyzer);
+                       
+                    final IndexWriterConfig config = new 
IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+                    config.setWriteLockTimeout(300000L);
+
+                    final IndexWriter indexWriter = new IndexWriter(directory, 
config);
+                    writerCount = new IndexWriterCount(indexWriter, analyzer, 
directory, 1);
+                    logger.debug("Providing new index writer for {}", 
indexingDirectory);
+                } catch (final IOException ioe) {
+                       for ( final Closeable closeable : closeables ) {
+                               try {
+                                       closeable.close();
+                               } catch (final IOException ioe2) {
+                                       ioe.addSuppressed(ioe2);
+                               }
+                       }
+                       
+                       throw ioe;
+                }
+                
+                writerCounts.put(absoluteFile, writerCount);
+                       } else {
+                               logger.debug("Providing existing index writer 
for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() 
+ 1);
+                               writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
+                                               writerCount.getAnalyzer(), 
writerCount.getDirectory(), writerCount.getCount() + 1));
+                       }
+                       
+                       return writerCount.getWriter();
+               } finally {
+                       lock.unlock();
+               }
+       }
+       
+       public void returnIndexWriter(final File indexingDirectory, final 
IndexWriter writer) {
+               final File absoluteFile = indexingDirectory.getAbsoluteFile();
+               logger.debug("Returning Index Writer for {} to IndexManager", 
indexingDirectory);
+               
+               lock.lock();
+               try {
+                       IndexWriterCount count = 
writerCounts.remove(absoluteFile);
+                       
+                       try {
+                               if ( count == null ) {
+                                       logger.warn("Index Writer {} was 
returned to IndexManager for {}, but this writer is not known. "
+                                                       + "This could 
potentially lead to a resource leak", writer, indexingDirectory);
+                                       writer.close();
+                               } else if ( count.getCount() <= 1 ) {
+                                       // we are finished with this writer.
+                                       logger.debug("Closing Index Writer for 
{}", indexingDirectory);
+                                       count.close();
+                               } else {
+                                       // decrement the count.
+                                       logger.debug("Decrementing count for 
Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+                                       writerCounts.put(absoluteFile, new 
IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), 
count.getCount() - 1));
+                               }
+                       } catch (final IOException ioe) {
+                               logger.warn("Failed to close Index Writer {} 
due to {}", writer, ioe);
+                               if ( logger.isDebugEnabled() ) {
+                                       logger.warn("", ioe);
+                               }
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       
+       public IndexSearcher borrowIndexSearcher(final File indexDir) throws 
IOException {
+               final File absoluteFile = indexDir.getAbsoluteFile();
+               logger.debug("Borrowing index searcher for {}", indexDir);
+               
+               lock.lock();
+               try {
+                       // check if we already have a reader cached.
+                       List<ActiveIndexSearcher> currentlyCached = 
activeSearchers.get(absoluteFile);
+                       if ( currentlyCached == null ) {
+                               currentlyCached = new ArrayList<>();
+                               activeSearchers.put(absoluteFile, 
currentlyCached);
+                       } else {
+                               // keep track of any searchers that have been 
closed so that we can remove them
+                               // from our cache later.
+                               final Set<ActiveIndexSearcher> expired = new 
HashSet<>();
+                               
+                               try {
+                                       for ( final ActiveIndexSearcher 
searcher : currentlyCached ) {
+                                               if ( searcher.isCache() ) {
+                                                       final int refCount = 
searcher.getSearcher().getIndexReader().getRefCount();
+                                                       if ( refCount <= 0 ) {
+                                                               // if refCount 
== 0, then the reader has been closed, so we need to discard the searcher
+                                                               
logger.debug("Reference count for cached Index Searcher for {} is currently {}; 
"
+                                                                       + 
"removing cached searcher", absoluteFile, refCount);
+                                                               
expired.add(searcher);
+                                                               continue;
+                                                       }
+                                                       
+                                                       logger.debug("Providing 
previously cached index searcher for {}", indexDir);
+                                                       return 
searcher.getSearcher();
+                                               }
+                                       }
+                               } finally {
+                                       // if we have any expired index 
searchers, we need to close them and remove them
+                                       // from the cache so that we don't try 
to use them again later.
+                                       for ( final ActiveIndexSearcher 
searcher : expired ) {
+                                               try {
+                                                       searcher.close();
+                                               } catch (final Exception e) {
+                                                       logger.debug("Failed to 
close 'expired' IndexSearcher {}", searcher);
+                                               }
+                                               
+                                               
currentlyCached.remove(searcher);
+                                       }
+                               }
+                       }
+                       
+                       IndexWriterCount writerCount = 
writerCounts.remove(absoluteFile);
+                       if ( writerCount == null ) {
+                               final Directory directory = 
FSDirectory.open(absoluteFile);
+                               logger.debug("No Index Writer currently exists 
for {}; creating a cachable reader", indexDir);
+                               
+                               try {
+                                       final DirectoryReader directoryReader = 
DirectoryReader.open(directory);
+                                       final IndexSearcher searcher = new 
IndexSearcher(directoryReader);
+                                       
+                                       // we want to cache the searcher that 
we create, since it's just a reader.
+                                       final ActiveIndexSearcher cached = new 
ActiveIndexSearcher(searcher, directoryReader, directory, true);
+                                       currentlyCached.add(cached);
+                                       
+                                       return cached.getSearcher();
+                               } catch (final IOException e) {
+                                       try {
+                                               directory.close();
+                                       } catch (final IOException ioe) {
+                                               e.addSuppressed(ioe);
+                                       }
+                                       
+                                       throw e;
+                               }
+                       } else {
+                               logger.debug("Index Writer currently exists for 
{}; creating a non-cachable reader and incrementing "
+                                               + "counter to {}", indexDir, 
writerCount.getCount() + 1);
+
+                               // increment the writer count to ensure that 
it's kept open.
+                               writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
+                                               writerCount.getAnalyzer(), 
writerCount.getDirectory(), writerCount.getCount() + 1));
+                               
+                               // create a new Index Searcher from the writer 
so that we don't have an issue with trying
+                               // to read from a directory that's locked. If 
we get the "no segments* file found" with
+                               // Lucene, this indicates that an IndexWriter 
already has the directory open.
+                               final IndexWriter writer = 
writerCount.getWriter();
+                               final DirectoryReader directoryReader = 
DirectoryReader.open(writer, false);
+                               final IndexSearcher searcher = new 
IndexSearcher(directoryReader);
+                               
+                               // we don't want to cache this searcher because 
it's based on a writer, so we want to get
+                               // new values the next time that we search.
+                               final ActiveIndexSearcher activeSearcher = new 
ActiveIndexSearcher(searcher, directoryReader, null, false);
+                               
+                               currentlyCached.add(activeSearcher);
+                               return activeSearcher.getSearcher();
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+       
+       
+       public void returnIndexSearcher(final File indexDirectory, final 
IndexSearcher searcher) {
+               final File absoluteFile = indexDirectory.getAbsoluteFile();
+               logger.debug("Returning index searcher for {} to IndexManager", 
indexDirectory);
+               
+               lock.lock();
+               try {
+                       // check if we already have a reader cached.
+                       List<ActiveIndexSearcher> currentlyCached = 
activeSearchers.get(absoluteFile);
+                       if ( currentlyCached == null ) {
+                               logger.warn("Received Index Searcher for {} but 
no searcher was provided for that directory; this could "
+                                               + "result in a resource leak", 
indexDirectory);
+                               return;
+                       }
+                       
+                       final Iterator<ActiveIndexSearcher> itr = 
currentlyCached.iterator();
+                       while (itr.hasNext()) {
+                               final ActiveIndexSearcher activeSearcher = 
itr.next();
+                               if ( 
activeSearcher.getSearcher().equals(searcher) ) {
+                                       if ( activeSearcher.isCache() ) {
+                                               // the searcher is cached. Just 
leave it open.
+                                               logger.debug("Index searcher 
for {} is cached; leaving open", indexDirectory);
+                                               return;
+                                       } else {
+                                               // searcher is not cached. It 
was created from a writer, and we want
+                                               // the newest updates the next 
time that we get a searcher, so we will
+                                               // go ahead and close this one 
out.
+                                               itr.remove();
+                                               
+                                               // decrement the writer count 
because we incremented it when creating the searcher
+                                               final IndexWriterCount 
writerCount = writerCounts.remove(absoluteFile);
+                                               if ( writerCount != null ) {
+                                                       if ( 
writerCount.getCount() <= 1 ) {
+                                                               try {
+                                                                       
logger.debug("Index searcher for {} is not cached. Writer count is "
+                                                                               
        + "decremented to {}; closing writer", indexDirectory, 
writerCount.getCount() - 1);
+                                                                       
+                                                                       
writerCount.close();
+                                                               } catch (final 
IOException ioe) {
+                                                                       
logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+                                                                       if ( 
logger.isDebugEnabled() ) {
+                                                                               
logger.warn("", ioe);
+                                                                       }
+                                                               }
+                                                       } else {
+                                                               
logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+                                                                               
+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+                                                               
+                                                               
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                                                                       
writerCount.getAnalyzer(), writerCount.getDirectory(), 
+                                                                       
writerCount.getCount() - 1));
+                                                       }
+                                               }
+
+                                               try {
+                                                       logger.debug("Closing 
Index Searcher for {}", indexDirectory);
+                                                       activeSearcher.close();
+                                               } catch (final IOException ioe) 
{
+                                                       logger.warn("Failed to 
close Index Searcher for {} due to {}", absoluteFile, ioe);
+                                                       if ( 
logger.isDebugEnabled() ) {
+                                                               logger.warn("", 
ioe);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+       
+       @Override
+       public void close() throws IOException {
+               logger.debug("Closing Index Manager");
+               
+               lock.lock();
+               try {
+                       IOException ioe = null;
+                       
+                       for ( final IndexWriterCount count : 
writerCounts.values() ) {
+                               try {
+                                       count.close();
+                               } catch (final IOException e) {
+                                       if ( ioe == null ) {
+                                               ioe = e;
+                                       } else {
+                                               ioe.addSuppressed(e);
+                                       }
+                               }
+                       }
+                       
+                       for (final List<ActiveIndexSearcher> searcherList : 
activeSearchers.values()) {
+                               for (final ActiveIndexSearcher searcher : 
searcherList) {
+                                       try {
+                                               searcher.close();
+                                       } catch (final IOException e) {
+                                               if ( ioe == null ) {
+                                                       ioe = e;
+                                               } else {
+                                                       ioe.addSuppressed(e);
+                                               }
+                                       }
+                               }
+                       }
+                       
+                       if ( ioe != null ) {
+                               throw ioe;
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       
+       private static void close(final Closeable... closeables) throws 
IOException {
+               IOException ioe = null;
+               for ( final Closeable closeable : closeables ) {
+                       if ( closeable == null ) {
+                               continue;
+                       }
+                       
+                       try {
+                               closeable.close();
+                       } catch (final IOException e) {
+                               if ( ioe == null ) {
+                                       ioe = e;
+                               } else {
+                                       ioe.addSuppressed(e);
+                               }
+                       }
+               }
+               
+               if ( ioe != null ) {
+                       throw ioe;
+               }
+       }
+       
+       
+       private static class ActiveIndexSearcher implements Closeable {
+               private final IndexSearcher searcher;
+               private final DirectoryReader directoryReader;
+               private final Directory directory;
+               private final boolean cache;
+               
+               public ActiveIndexSearcher(IndexSearcher searcher, 
DirectoryReader directoryReader, 
+                               Directory directory, final boolean cache) {
+                       this.searcher = searcher;
+                       this.directoryReader = directoryReader;
+                       this.directory = directory;
+                       this.cache = cache;
+               }
+
+               public boolean isCache() {
+                       return cache;
+               }
+
+               public IndexSearcher getSearcher() {
+                       return searcher;
+               }
+               
+               @Override
+               public void close() throws IOException {
+                       IndexManager.close(directoryReader, directory);
+               }
+       }
+       
+       
+       private static class IndexWriterCount implements Closeable {
+               private final IndexWriter writer;
+               private final Analyzer analyzer;
+               private final Directory directory;
+               private final int count;
+               
+               public IndexWriterCount(final IndexWriter writer, final 
Analyzer analyzer, final Directory directory, final int count) {
+                       this.writer = writer;
+                       this.analyzer = analyzer;
+                       this.directory = directory;
+                       this.count = count;
+               }
+
+               public Analyzer getAnalyzer() {
+                       return analyzer;
+               }
+
+               public Directory getDirectory() {
+                       return directory;
+               }
+
+               public IndexWriter getWriter() {
+                       return writer;
+               }
+
+               public int getCount() {
+                       return count;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       IndexManager.close(writer, analyzer, directory);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index e2854c3..dcb6e08 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -17,31 +17,33 @@
 package org.apache.nifi.provenance.lucene;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.StandardQueryResult;
-
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexNotFoundException;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardQueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IndexSearch {
-
+       private final Logger logger = 
LoggerFactory.getLogger(IndexSearch.class);
     private final PersistentProvenanceRepository repository;
     private final File indexDirectory;
+    private final IndexManager indexManager;
 
-    public IndexSearch(final PersistentProvenanceRepository repo, final File 
indexDirectory) {
+    public IndexSearch(final PersistentProvenanceRepository repo, final File 
indexDirectory, final IndexManager indexManager) {
         this.repository = repo;
         this.indexDirectory = indexDirectory;
+        this.indexManager = indexManager;
     }
 
     public StandardQueryResult search(final 
org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger 
retrievedCount) throws IOException {
@@ -55,30 +57,57 @@ public class IndexSearch {
         final StandardQueryResult sqr = new 
StandardQueryResult(provenanceQuery, 1);
         final Set<ProvenanceEventRecord> matchingRecords;
 
-        try (final DirectoryReader directoryReader = 
DirectoryReader.open(FSDirectory.open(indexDirectory))) {
-            final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
-            if (provenanceQuery.getEndDate() == null) {
-                provenanceQuery.setEndDate(new Date());
-            }
-            final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
+        if (provenanceQuery.getEndDate() == null) {
+            provenanceQuery.setEndDate(new Date());
+        }
+        final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
 
-            TopDocs topDocs = searcher.search(luceneQuery, 
provenanceQuery.getMaxResults());
+        final long start = System.nanoTime();
+        IndexSearcher searcher = null;
+        try {
+               searcher = indexManager.borrowIndexSearcher(indexDirectory);
+            final long searchStartNanos = System.nanoTime();
+            final long openSearcherNanos = searchStartNanos - start;
+            
+            final TopDocs topDocs = searcher.search(luceneQuery, 
provenanceQuery.getMaxResults());
+            final long finishSearch = System.nanoTime();
+            final long searchNanos = finishSearch - searchStartNanos;
+            
+            logger.debug("Searching {} took {} millis; opening searcher took 
{} millis", this, 
+                       TimeUnit.NANOSECONDS.toMillis(searchNanos), 
TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+            
             if (topDocs.totalHits == 0) {
                 sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
                 return sqr;
             }
 
             final DocsReader docsReader = new 
DocsReader(repository.getConfiguration().getStorageDirectories());
-            matchingRecords = docsReader.read(topDocs, directoryReader, 
repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-
+            matchingRecords = docsReader.read(topDocs, 
searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, 
provenanceQuery.getMaxResults());
+            
+            final long readRecordsNanos = System.nanoTime() - finishSearch;
+            logger.debug("Reading {} records took {} millis for {}", 
matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
+            
             sqr.update(matchingRecords, topDocs.totalHits);
             return sqr;
-        } catch (final IndexNotFoundException e) {
-            // nothing has been indexed yet.
+        } catch (final FileNotFoundException e) {
+            // nothing has been indexed yet, or the data has already aged off
+               logger.warn("Attempted to search Provenance Index {} but could 
not find the file due to {}", indexDirectory, e);
+               if ( logger.isDebugEnabled() ) {
+                       logger.warn("", e);
+               }
+               
             sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
             return sqr;
+        } finally {
+               if ( searcher != null ) {
+                       indexManager.returnIndexSearcher(indexDirectory, 
searcher);
+               }
         }
     }
 
+    
+    @Override
+    public String toString() {
+       return "IndexSearcher[" + indexDirectory + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 214267a..5e87913 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -24,27 +24,27 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.provenance.SearchableFields;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
-import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.IntField;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.IndexConfiguration;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.rollover.RolloverAction;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,15 +72,93 @@ public class IndexingAction implements RolloverAction {
         doc.add(new StringField(field.getSearchableFieldName(), 
value.toLowerCase(), store));
     }
 
+    
+    public void index(final StandardProvenanceEventRecord record, final 
IndexWriter indexWriter, final Integer blockIndex) throws IOException {
+        final Map<String, String> attributes = record.getAttributes();
+
+        final Document doc = new Document();
+        addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), 
Store.NO);
+        addField(doc, SearchableFields.Filename, 
attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
+        addField(doc, SearchableFields.ComponentID, record.getComponentId(), 
Store.NO);
+        addField(doc, SearchableFields.AlternateIdentifierURI, 
record.getAlternateIdentifierUri(), Store.NO);
+        addField(doc, SearchableFields.EventType, 
record.getEventType().name(), Store.NO);
+        addField(doc, SearchableFields.Relationship, record.getRelationship(), 
Store.NO);
+        addField(doc, SearchableFields.Details, record.getDetails(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimSection, 
record.getContentClaimSection(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimContainer, 
record.getContentClaimContainer(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimIdentifier, 
record.getContentClaimIdentifier(), Store.NO);
+        addField(doc, SearchableFields.SourceQueueIdentifier, 
record.getSourceQueueIdentifier(), Store.NO);
+
+        if 
(nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
+            addField(doc, SearchableFields.TransitURI, record.getTransitUri(), 
Store.NO);
+        }
+
+        for (final SearchableField searchableField : 
attributeSearchableFields) {
+            addField(doc, searchableField, 
attributes.get(searchableField.getSearchableFieldName()), Store.NO);
+        }
+
+        final String storageFilename = 
LuceneUtil.substringBefore(record.getStorageFilename(), ".");
+
+        // Index the fields that we always index (unless there's nothing else 
to index at all)
+        if (!doc.getFields().isEmpty()) {
+            doc.add(new 
LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), 
record.getLineageStartDate(), Store.NO));
+            doc.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
record.getEventTime(), Store.NO));
+            doc.add(new 
LongField(SearchableFields.FileSize.getSearchableFieldName(), 
record.getFileSize(), Store.NO));
+            doc.add(new StringField(FieldNames.STORAGE_FILENAME, 
storageFilename, Store.YES));
+            
+            if ( blockIndex == null ) {
+               doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, 
record.getStorageByteOffset(), Store.YES));
+            } else {
+                   doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, 
Store.YES));
+                   doc.add(new 
LongField(SearchableFields.Identifier.getSearchableFieldName(), 
record.getEventId(), Store.YES));
+            }
+            
+            for (final String lineageIdentifier : 
record.getLineageIdentifiers()) {
+                addField(doc, SearchableFields.LineageIdentifier, 
lineageIdentifier, Store.NO);
+            }
+
+            // If it's event is a FORK, or JOIN, add the FlowFileUUID for all 
child/parent UUIDs.
+            if (record.getEventType() == ProvenanceEventType.FORK || 
record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == 
ProvenanceEventType.REPLAY) {
+                for (final String uuid : record.getChildUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
Store.NO);
+                    }
+                }
+            } else if (record.getEventType() == ProvenanceEventType.JOIN) {
+                for (final String uuid : record.getParentUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
Store.NO);
+                    }
+                }
+            } else if (record.getEventType() == ProvenanceEventType.RECEIVE && 
record.getSourceSystemFlowFileIdentifier() != null) {
+                // If we get a receive with a Source System FlowFile 
Identifier, we add another Document that shows the UUID
+                // that the Source System uses to refer to the data.
+                final String sourceIdentifier = 
record.getSourceSystemFlowFileIdentifier();
+                final String sourceFlowFileUUID;
+                final int lastColon = sourceIdentifier.lastIndexOf(":");
+                if (lastColon > -1 && lastColon < sourceIdentifier.length() - 
2) {
+                    sourceFlowFileUUID = sourceIdentifier.substring(lastColon 
+ 1);
+                } else {
+                    sourceFlowFileUUID = null;
+                }
+
+                if (sourceFlowFileUUID != null) {
+                    addField(doc, SearchableFields.FlowFileUUID, 
sourceFlowFileUUID, Store.NO);
+                }
+            }
+
+            indexWriter.addDocument(doc);
+        }
+    }
+    
     @Override
-    @SuppressWarnings("deprecation")
     public File execute(final File fileRolledOver) throws IOException {
         final File indexingDirectory = 
indexConfiguration.getWritableIndexDirectory(fileRolledOver);
         int indexCount = 0;
         long maxId = -1L;
 
         try (final Directory directory = FSDirectory.open(indexingDirectory);
-                final Analyzer analyzer = new 
StandardAnalyzer(LuceneUtil.LUCENE_VERSION)) {
+                final Analyzer analyzer = new StandardAnalyzer()) {
 
             final IndexWriterConfig config = new 
IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
             config.setWriteLockTimeout(300000L);
@@ -89,6 +167,13 @@ public class IndexingAction implements RolloverAction {
                     final RecordReader reader = 
RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
                 StandardProvenanceEventRecord record;
                 while (true) {
+                       final Integer blockIndex;
+                       if ( reader.isBlockIndexAvailable() ) {
+                               blockIndex = reader.getBlockIndex();
+                       } else {
+                               blockIndex = null;
+                       }
+                       
                     try {
                         record = reader.nextRecord();
                     } catch (final EOFException eof) {
@@ -104,76 +189,8 @@ public class IndexingAction implements RolloverAction {
 
                     maxId = record.getEventId();
 
-                    final Map<String, String> attributes = 
record.getAttributes();
-
-                    final Document doc = new Document();
-                    addField(doc, SearchableFields.FlowFileUUID, 
record.getFlowFileUuid(), Store.NO);
-                    addField(doc, SearchableFields.Filename, 
attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
-                    addField(doc, SearchableFields.ComponentID, 
record.getComponentId(), Store.NO);
-                    addField(doc, SearchableFields.AlternateIdentifierURI, 
record.getAlternateIdentifierUri(), Store.NO);
-                    addField(doc, SearchableFields.EventType, 
record.getEventType().name(), Store.NO);
-                    addField(doc, SearchableFields.Relationship, 
record.getRelationship(), Store.NO);
-                    addField(doc, SearchableFields.Details, 
record.getDetails(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimSection, 
record.getContentClaimSection(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimContainer, 
record.getContentClaimContainer(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimIdentifier, 
record.getContentClaimIdentifier(), Store.NO);
-                    addField(doc, SearchableFields.SourceQueueIdentifier, 
record.getSourceQueueIdentifier(), Store.NO);
-
-                    if 
(nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
-                        addField(doc, SearchableFields.TransitURI, 
record.getTransitUri(), Store.NO);
-                    }
-
-                    for (final SearchableField searchableField : 
attributeSearchableFields) {
-                        addField(doc, searchableField, 
attributes.get(searchableField.getSearchableFieldName()), Store.NO);
-                    }
-
-                    final String storageFilename = 
LuceneUtil.substringBefore(record.getStorageFilename(), ".");
-
-                    // Index the fields that we always index (unless there's 
nothing else to index at all)
-                    if (!doc.getFields().isEmpty()) {
-                        doc.add(new 
LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), 
record.getLineageStartDate(), Store.NO));
-                        doc.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
record.getEventTime(), Store.NO));
-                        doc.add(new 
LongField(SearchableFields.FileSize.getSearchableFieldName(), 
record.getFileSize(), Store.NO));
-                        doc.add(new StringField(FieldNames.STORAGE_FILENAME, 
storageFilename, Store.YES));
-                        doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, 
record.getStorageByteOffset(), Store.YES));
-
-                        for (final String lineageIdentifier : 
record.getLineageIdentifiers()) {
-                            addField(doc, SearchableFields.LineageIdentifier, 
lineageIdentifier, Store.NO);
-                        }
-
-                        // If it's event is a FORK, or JOIN, add the 
FlowFileUUID for all child/parent UUIDs.
-                        if (record.getEventType() == ProvenanceEventType.FORK 
|| record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() 
== ProvenanceEventType.REPLAY) {
-                            for (final String uuid : record.getChildUuids()) {
-                                if (!uuid.equals(record.getFlowFileUuid())) {
-                                    addField(doc, 
SearchableFields.FlowFileUUID, uuid, Store.NO);
-                                }
-                            }
-                        } else if (record.getEventType() == 
ProvenanceEventType.JOIN) {
-                            for (final String uuid : record.getParentUuids()) {
-                                if (!uuid.equals(record.getFlowFileUuid())) {
-                                    addField(doc, 
SearchableFields.FlowFileUUID, uuid, Store.NO);
-                                }
-                            }
-                        } else if (record.getEventType() == 
ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != 
null) {
-                            // If we get a receive with a Source System 
FlowFile Identifier, we add another Document that shows the UUID
-                            // that the Source System uses to refer to the 
data.
-                            final String sourceIdentifier = 
record.getSourceSystemFlowFileIdentifier();
-                            final String sourceFlowFileUUID;
-                            final int lastColon = 
sourceIdentifier.lastIndexOf(":");
-                            if (lastColon > -1 && lastColon < 
sourceIdentifier.length() - 2) {
-                                sourceFlowFileUUID = 
sourceIdentifier.substring(lastColon + 1);
-                            } else {
-                                sourceFlowFileUUID = null;
-                            }
-
-                            if (sourceFlowFileUUID != null) {
-                                addField(doc, SearchableFields.FlowFileUUID, 
sourceFlowFileUUID, Store.NO);
-                            }
-                        }
-
-                        indexWriter.addDocument(doc);
-                        indexCount++;
-                    }
+                    index(record, indexWriter, blockIndex);
+                    indexCount++;
                 }
 
                 indexWriter.commit();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index a7076d5..59dc10b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -27,8 +27,8 @@ import java.util.List;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.search.SearchTerm;
-
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanClause.Occur;
@@ -78,7 +78,16 @@ public class LuceneUtil {
         final String searchString = baseName + ".";
         for (final Path path : allProvenanceLogs) {
             if (path.toFile().getName().startsWith(searchString)) {
-                matchingFiles.add(path.toFile());
+               final File file = path.toFile();
+               if ( file.exists() ) {
+                       matchingFiles.add(file);
+               } else {
+                       final File dir = file.getParentFile();
+                       final File gzFile = new File(dir, file.getName() + 
".gz");
+                       if ( gzFile.exists() ) {
+                               matchingFiles.add(gzFile);
+                       }
+               }
             }
         }
 
@@ -132,6 +141,19 @@ public class LuceneUtil {
                     return filenameComp;
                 }
 
+                final IndexableField fileOffset1 = 
o1.getField(FieldNames.BLOCK_INDEX);
+                final IndexableField fileOffset2 = 
o1.getField(FieldNames.BLOCK_INDEX);
+                if ( fileOffset1 != null && fileOffset2 != null ) {
+                       final int blockIndexResult = 
Long.compare(fileOffset1.numericValue().longValue(), 
fileOffset2.numericValue().longValue());
+                       if ( blockIndexResult != 0 ) {
+                               return blockIndexResult;
+                       }
+                       
+                       final long eventId1 = 
o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                       final long eventId2 = 
o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                       return Long.compare(eventId1, eventId2);
+                }
+                
                 final long offset1 = 
o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 final long offset2 = 
o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 return Long.compare(offset1, offset2);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 862bc2b..8bdc88a 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -20,12 +20,79 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocReader;
 
 public interface RecordReader extends Closeable {
 
+       /**
+        * Returns the next record in the reader, or <code>null</code> if there 
is no more data available.
+        * @return
+        * @throws IOException
+        */
     StandardProvenanceEventRecord nextRecord() throws IOException;
 
+    /**
+     * Skips the specified number of bytes
+     * @param bytesToSkip
+     * @throws IOException
+     */
     void skip(long bytesToSkip) throws IOException;
 
+    /**
+     * Skips to the specified byte offset in the underlying stream.
+     * @param position
+     * @throws IOException if the underlying stream throws IOException, or if 
the reader has already
+     * passed the specified byte offset
+     */
     void skipTo(long position) throws IOException;
+    
+    /**
+     * Skips to the specified compression block
+     * 
+     * @param blockIndex
+     * @throws IOException if the underlying stream throws IOException, or if 
the reader has already
+     * read passed the specified compression block index
+     * @throws IllegalStateException if the RecordReader does not have a 
TableOfContents associated with it
+     */
+    void skipToBlock(int blockIndex) throws IOException;
+    
+    /**
+     * Returns the block index that the Reader is currently reading from.
+     * Note that the block index is incremented at the beginning of the {@link 
#nextRecord()}
+     * method. This means that this method will return the block from which 
the previous record was read, 
+     * if calling {@link #nextRecord()} continually, not the block from which 
the next record will be read.
+     * @return
+     */
+    int getBlockIndex();
+    
+    /**
+     * Returns <code>true</code> if the compression block index is available. 
It will be available
+     * if and only if the reader is created with a TableOfContents
+     * 
+     * @return
+     */
+    boolean isBlockIndexAvailable();
+    
+    /**
+     * Returns the {@link TocReader} that is used to keep track of compression 
blocks, if one exists,
+     * <code>null</code> otherwise
+     * @return
+     */
+    TocReader getTocReader();
+    
+    /**
+     * Returns the number of bytes that have been consumed from the stream 
(read or skipped).
+     * @return
+     */
+    long getBytesConsumed();
+    
+    /**
+     * Returns the ID of the last event in this record reader, or -1 if the 
reader has no records or
+     * has already read through all records. Note: This method will consume 
the stream until the end,
+     * so no more records will be available on this reader after calling this 
method.
+     * 
+     * @return
+     * @throws IOException
+     */
+    long getMaxEventId() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 8f06995..dff281c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.provenance.serialization;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -24,82 +23,90 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
 import java.util.Collection;
-import java.util.zip.GZIPInputStream;
 
-import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.provenance.StandardRecordReader;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
 
 public class RecordReaders {
 
     public static RecordReader newRecordReader(File file, final 
Collection<Path> provenanceLogFiles) throws IOException {
         final File originalFile = file;
-        
-        if (!file.exists()) {
-            if (provenanceLogFiles == null) {
-                throw new FileNotFoundException(file.toString());
-            }
-
-            final String baseName = LuceneUtil.substringBefore(file.getName(), 
".") + ".";
-            for (final Path path : provenanceLogFiles) {
-                if (path.toFile().getName().startsWith(baseName)) {
-                    file = path.toFile();
-                    break;
-                }
-            }
-        }
-
         InputStream fis = null;
-        if ( file.exists() ) {
-            try {
-                fis = new FileInputStream(file);
-            } catch (final FileNotFoundException fnfe) {
-                fis = null;
-            }
-        }
-        
-        openStream: while ( fis == null ) {
-            final File dir = file.getParentFile();
-            final String baseName = LuceneUtil.substringBefore(file.getName(), 
".");
-            
-            // depending on which rollover actions have occurred, we could 
have 3 possibilities for the
-            // filename that we need. The majority of the time, we will use 
the extension ".prov.indexed.gz"
-            // because most often we are compressing on rollover and most 
often we have already finished
-            // compressing by the time that we are querying the data.
-            for ( final String extension : new String[] {".indexed.prov.gz", 
".indexed.prov", ".prov"} ) {
-                file = new File(dir, baseName + extension);
-                if ( file.exists() ) {
-                    try {
-                        fis = new FileInputStream(file);
-                        break openStream;
-                    } catch (final FileNotFoundException fnfe) {
-                        // file was modified by a RolloverAction after we 
verified that it exists but before we could
-                        // create an InputStream for it. Start over.
-                        fis = null;
-                        continue openStream;
-                    }
-                }
-            }
-            
-            break;
-        }
 
-        if ( fis == null ) {
-            throw new FileNotFoundException("Unable to locate file " + 
originalFile);
-        }
-        final InputStream readableStream;
-        if (file.getName().endsWith(".gz")) {
-            readableStream = new BufferedInputStream(new GZIPInputStream(fis));
-        } else {
-            readableStream = new BufferedInputStream(fis);
+        try {
+               if (!file.exists()) {
+                   if (provenanceLogFiles != null) {
+                           final String baseName = 
LuceneUtil.substringBefore(file.getName(), ".") + ".";
+                           for (final Path path : provenanceLogFiles) {
+                               if 
(path.toFile().getName().startsWith(baseName)) {
+                                   file = path.toFile();
+                                   break;
+                               }
+                           }
+                   }
+               }
+       
+               if ( file.exists() ) {
+                   try {
+                       fis = new FileInputStream(file);
+                   } catch (final FileNotFoundException fnfe) {
+                       fis = null;
+                   }
+               }
+               
+               String filename = file.getName();
+               openStream: while ( fis == null ) {
+                   final File dir = file.getParentFile();
+                   final String baseName = 
LuceneUtil.substringBefore(file.getName(), ".");
+                   
+                   // depending on which rollover actions have occurred, we 
could have 3 possibilities for the
+                   // filename that we need. The majority of the time, we will 
use the extension ".prov.indexed.gz"
+                   // because most often we are compressing on rollover and 
most often we have already finished
+                   // compressing by the time that we are querying the data.
+                   for ( final String extension : new String[] {".prov.gz", 
".prov"} ) {
+                       file = new File(dir, baseName + extension);
+                       if ( file.exists() ) {
+                           try {
+                               fis = new FileInputStream(file);
+                               filename = baseName + extension;
+                               break openStream;
+                           } catch (final FileNotFoundException fnfe) {
+                               // file was modified by a RolloverAction after 
we verified that it exists but before we could
+                               // create an InputStream for it. Start over.
+                               fis = null;
+                               continue openStream;
+                           }
+                       }
+                   }
+                   
+                   break;
+               }
+       
+               if ( fis == null ) {
+                   throw new FileNotFoundException("Unable to locate file " + 
originalFile);
+               }
+       
+               final File tocFile = TocUtil.getTocFile(file);
+               if ( tocFile.exists() ) {
+                       final TocReader tocReader = new 
StandardTocReader(tocFile);
+                       return new StandardRecordReader(fis, filename, 
tocReader);
+               } else {
+                       return new StandardRecordReader(fis, filename);
+               }
+        } catch (final IOException ioe) {
+               if ( fis != null ) {
+                       try {
+                               fis.close();
+                       } catch (final IOException inner) {
+                               ioe.addSuppressed(inner);
+                       }
+               }
+               
+               throw ioe;
         }
-
-        final DataInputStream dis = new DataInputStream(readableStream);
-        @SuppressWarnings("unused")
-        final String repoClassName = dis.readUTF();
-        final int serializationVersion = dis.readInt();
-
-        return new StandardRecordReader(dis, serializationVersion, 
file.getName());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index de98ab9..58f4dc2 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
 
 public interface RecordWriter extends Closeable {
 
@@ -82,4 +83,9 @@ public interface RecordWriter extends Closeable {
      */
     void sync() throws IOException;
 
+    /**
+     * Returns the TOC Writer that is being used to write the Table of 
Contents for this journal
+     * @return
+     */
+    TocWriter getTocWriter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 15349de..47b7c7e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -20,11 +20,20 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.StandardRecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
 
 public class RecordWriters {
+       private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024;  
// 1 MB
 
-    public static RecordWriter newRecordWriter(final File file) throws 
IOException {
-        return new StandardRecordWriter(file);
+    public static RecordWriter newRecordWriter(final File file, final boolean 
compressed, final boolean createToc) throws IOException {
+       return newRecordWriter(file, compressed, createToc, 
DEFAULT_COMPRESSION_BLOCK_SIZE);
+    }
+    
+    public static RecordWriter newRecordWriter(final File file, final boolean 
compressed, final boolean createToc, final int compressionBlockBytes) throws 
IOException {
+       final TocWriter tocWriter = createToc ? new 
StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+        return new StandardRecordWriter(file, tocWriter, compressed, 
compressionBlockBytes);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
new file mode 100644
index 0000000..8944cec
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Standard implementation of TocReader.
+ * 
+ * Expects .toc file to be in the following format;
+ * 
+ * byte 0: version
+ * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = 
journal is compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocReader implements TocReader {
+    private final boolean compressed;
+    private final long[] offsets;
+    
+    public StandardTocReader(final File file) throws IOException {
+        try (final FileInputStream fis = new FileInputStream(file);
+             final DataInputStream dis = new DataInputStream(fis)) {
+            
+            final int version = dis.read();
+            if ( version < 0 ) {
+                throw new EOFException();
+            }
+            
+            final int compressionFlag = dis.read();
+            if ( compressionFlag < 0 ) {
+                throw new EOFException();
+            }
+            
+            if ( compressionFlag == 0 ) {
+                compressed = false;
+            } else if ( compressionFlag == 1 ) {
+                compressed = true;
+            } else {
+                throw new IOException("Table of Contents appears to be 
corrupt: could not read 'compression flag' from header; expected value of 0 or 
1 but got " + compressionFlag);
+            }
+            
+            final int numBlocks = (int) ((file.length() - 2) / 8);
+            offsets = new long[numBlocks];
+            
+            for (int i=0; i < numBlocks; i++) {
+                offsets[i] = dis.readLong();
+            }
+        }
+    }
+    
+    @Override
+    public boolean isCompressed() {
+        return compressed;
+    }
+    
+    @Override
+    public long getBlockOffset(final int blockIndex) {
+        if ( blockIndex >= offsets.length ) {
+            return -1L;
+        }
+        return offsets[blockIndex];
+    }
+
+    @Override
+    public long getLastBlockOffset() {
+        if ( offsets.length == 0 ) {
+            return 0L;
+        }
+        return offsets[offsets.length - 1];
+    }
+    
+    @Override
+    public void close() throws IOException {
+    }
+
+       @Override
+       public int getBlockIndex(final long blockOffset) {
+               for (int i=0; i < offsets.length; i++) {
+                       if ( offsets[i] > blockOffset ) {
+                               return i-1;
+                       }
+               }
+               
+               return offsets.length - 1;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
new file mode 100644
index 0000000..488f225
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Standard implementation of {@link TocWriter}.
+ * 
+ * Format of .toc file:
+ * byte 0: version
+ * byte 1: compressed: 0 -> not compressed, 1 -> compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocWriter implements TocWriter {
+       private static final Logger logger = 
LoggerFactory.getLogger(StandardTocWriter.class);
+       
+    public static final byte VERSION = 1;
+    
+    private final File file;
+    private final FileOutputStream fos;
+    private final boolean alwaysSync;
+    private int index = -1;
+    
+    /**
+     * Creates a StandardTocWriter that writes to the given file.
+     * @param file the file to write to
+     * @param compressionFlag whether or not the journal is compressed
+     * @throws FileNotFoundException 
+     */
+    public StandardTocWriter(final File file, final boolean compressionFlag, 
final boolean alwaysSync) throws IOException {
+        final File tocDir = file.getParentFile();
+        if ( !tocDir.exists() ) {
+               Files.createDirectories(tocDir.toPath());
+        }
+        
+        this.file = file;
+        fos = new FileOutputStream(file);
+        this.alwaysSync = alwaysSync;
+
+        final byte[] header = new byte[2];
+        header[0] = VERSION;
+        header[1] = (byte) (compressionFlag ? 1 : 0);
+        fos.write(header);
+        fos.flush();
+        
+        if ( alwaysSync ) {
+            sync();
+        }
+    }
+    
+    @Override
+    public void addBlockOffset(final long offset) throws IOException {
+        final BufferedOutputStream bos = new BufferedOutputStream(fos);
+        final DataOutputStream dos = new DataOutputStream(bos);
+        dos.writeLong(offset);
+        dos.flush();
+        index++;
+        logger.debug("Adding block {} at offset {}", index, offset);
+        
+        if ( alwaysSync ) {
+            sync();
+        }
+    }
+    
+    @Override
+    public void sync() throws IOException {
+       fos.getFD().sync();
+    }
+    
+    @Override
+    public int getCurrentBlockIndex() {
+        return index;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (alwaysSync) {
+            fos.getFD().sync();
+        }
+        
+        fos.close();
+    }
+    
+    @Override
+    public File getFile() {
+        return file;
+    }
+    
+    @Override
+    public String toString() {
+        return "TOC Writer for " + file;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
new file mode 100644
index 0000000..7c197be
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+
+/**
+ * <p>
+ * Reads a Table of Contents (.toc file) for a corresponding Journal File. We 
use a Table of Contents
+ * to map a Block Index to an offset into the Journal file where that Block 
begins. We do this so that
+ * we can then persist a Block Index for an event and then compress the 
Journal later. This way, we can
+ * get good compression by compressing a large batch of events at once, and 
this way we can also look up
+ * an event in a Journal that has not been compressed by looking in the Table 
of Contents or lookup the
+ * event in a Journal post-compression by simply rewriting the TOC while we 
compress the data.
+ * </p>
+ */
+public interface TocReader extends Closeable {
+
+    /**
+     * Indicates whether or not the corresponding Journal file is compressed
+     * @return
+     */
+    boolean isCompressed();
+
+    /**
+     * Returns the byte offset into the Journal File for the Block with the 
given index.
+     * @param blockIndex
+     * @return
+     */
+    long getBlockOffset(int blockIndex);
+    
+    /**
+     * Returns the byte offset into the Journal File of the last Block in the 
given index
+     * @return
+     */
+    long getLastBlockOffset();
+    
+    /**
+     * Returns the index of the block that contains the given offset
+     * @param blockOffset
+     * @return
+     */
+    int getBlockIndex(long blockOffset);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
new file mode 100644
index 0000000..c30ac98
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.File;
+
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+
+public class TocUtil {
+
+       /**
+        * Returns the file that should be used as the Table of Contents for 
the given Journal File
+        * @param journalFile
+        * @return
+        */
+       public static File getTocFile(final File journalFile) {
+       final File tocDir = new File(journalFile.getParentFile(), "toc");
+       final String basename = 
LuceneUtil.substringBefore(journalFile.getName(), ".");
+       final File tocFile = new File(tocDir, basename + ".toc");
+       return tocFile;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
new file mode 100644
index 0000000..c678053
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Writes a .toc file
+ */
+public interface TocWriter extends Closeable {
+
+    /**
+     * Adds the given block offset as the next Block Offset in the Table of 
Contents
+     * @param offset
+     * @throws IOException
+     */
+    void addBlockOffset(long offset) throws IOException;
+    
+    /**
+     * Returns the index of the current Block
+     * @return
+     */
+    int getCurrentBlockIndex();
+    
+    /**
+     * Returns the file that is currently being written to
+     * @return
+     */
+    File getFile();
+
+    /**
+     * Synchronizes the data with the underlying storage device
+     * @throws IOException
+     */
+    void sync() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5be208b..25a363f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.provenance;
 
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -25,14 +26,14 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.SimpleAnalyzer;
@@ -45,7 +46,6 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.provenance.lineage.EventNode;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
@@ -59,8 +59,10 @@ import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -72,87 +74,47 @@ public class TestPersistentProvenanceRepository {
     public TestName name = new TestName();
 
     private PersistentProvenanceRepository repo;
+    private RepositoryConfiguration config;
     
     public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
 
     private RepositoryConfiguration createConfiguration() {
-        final RepositoryConfiguration config = new RepositoryConfiguration();
+        config = new RepositoryConfiguration();
         config.addStorageDirectory(new File("target/storage/" + 
UUID.randomUUID().toString()));
-        config.setCompressOnRollover(false);
+        config.setCompressOnRollover(true);
         config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
+        config.setCompressionBlockBytes(100);
         return config;
     }
 
+    @BeforeClass
+    public static void setLogLevel() {
+       
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", 
"DEBUG");
+    }
+    
     @Before
     public void printTestName() {
         System.out.println("\n\n\n***********************  " + 
name.getMethodName() + "  *****************************");
     }
 
     @After
-    public void closeRepo() {
+    public void closeRepo() throws IOException {
         if (repo != null) {
             try {
                 repo.close();
             } catch (final IOException ioe) {
             }
         }
+        
+        // Delete all of the storage files. We do this in order to clean up 
the tons of files that
+        // we create but also to ensure that we have closed all of the file 
handles. If we leave any
+        // streams open, for instance, this will throw an IOException, causing 
our unit test to fail.
+        for ( final File storageDir : config.getStorageDirectories() ) {
+               FileUtils.deleteFile(storageDir, true);
+        }
     }
 
-    private FlowFile createFlowFile(final long id, final long fileSize, final 
Map<String, String> attributes) {
-        final Map<String, String> attrCopy = new HashMap<>(attributes);
-
-        return new FlowFile() {
-            @Override
-            public long getId() {
-                return id;
-            }
-
-            @Override
-            public long getEntryDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public Set<String> getLineageIdentifiers() {
-                return new HashSet<String>();
-            }
-
-            @Override
-            public long getLineageStartDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public Long getLastQueueDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public boolean isPenalized() {
-                return false;
-            }
-
-            @Override
-            public String getAttribute(final String s) {
-                return attrCopy.get(s);
-            }
-
-            @Override
-            public long getSize() {
-                return fileSize;
-            }
-
-            @Override
-            public Map<String, String> getAttributes() {
-                return attrCopy;
-            }
-
-            @Override
-            public int compareTo(final FlowFile o) {
-                return 0;
-            }
-        };
-    }
+    
 
     private EventReporter getEventReporter() {
         return new EventReporter() {
@@ -261,6 +223,8 @@ public class TestPersistentProvenanceRepository {
             repo.registerEvent(record);
         }
 
+        Thread.sleep(1000L);
+        
         repo.close();
         Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all 
file handles, etc.)
 
@@ -417,10 +381,10 @@ public class TestPersistentProvenanceRepository {
     @Test
     public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws 
IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
-        config.setMaxRecordLife(3, TimeUnit.SECONDS);
-        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxRecordLife(30, TimeUnit.SECONDS);
+        config.setMaxStorageCapacity(1024L * 1024L * 10);
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
-        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setMaxEventFileCapacity(1024L * 1024L * 10);
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
@@ -923,12 +887,16 @@ public class TestPersistentProvenanceRepository {
         final PersistentProvenanceRepository secondRepo = new 
PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
         secondRepo.initialize(getEventReporter());
 
-        final ProvenanceEventRecord event11 = builder.build();
-        secondRepo.registerEvent(event11);
-        secondRepo.waitForRollover();
-        final ProvenanceEventRecord event11Retrieved = 
secondRepo.getEvent(10L);
-        assertNotNull(event11Retrieved);
-        assertEquals(10, event11Retrieved.getEventId());
+        try {
+               final ProvenanceEventRecord event11 = builder.build();
+               secondRepo.registerEvent(event11);
+               secondRepo.waitForRollover();
+               final ProvenanceEventRecord event11Retrieved = 
secondRepo.getEvent(10L);
+               assertNotNull(event11Retrieved);
+               assertEquals(10, event11Retrieved.getEventId());
+        } finally {
+               secondRepo.close();
+        }
     }
 
     @Test
@@ -998,6 +966,73 @@ public class TestPersistentProvenanceRepository {
         storageDirFiles = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
         assertEquals(0, storageDirFiles.length);
     }
+    
+    
+    @Test
+    public void testBackPressure() throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileCapacity(1L);    // force rollover on each 
record.
+        config.setJournalCount(1);
+        
+        final AtomicInteger journalCountRef = new AtomicInteger(0);
+        
+       repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+               @Override
+               protected int getJournalCount() {
+                       return journalCountRef.get();
+               }
+       };
+        repo.initialize(getEventReporter());
+
+       final Map<String, String> attributes = new HashMap<>();
+       final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", UUID.randomUUID().toString());
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        // ensure that we can register the events.
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            repo.registerEvent(builder.build());
+        }
+
+        // set number of journals to 6 so that we will block.
+        journalCountRef.set(6);
+
+        final AtomicLong threadNanos = new AtomicLong(0L);
+        final Thread t = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               final long start = System.nanoTime();
+                       builder.fromFlowFile(createFlowFile(13, 3000L, 
attributes));
+                       attributes.put("uuid", 
"00000000-0000-0000-0000-00000000000" + 13);
+                       repo.registerEvent(builder.build());
+                       threadNanos.set(System.nanoTime() - start);
+                       }
+        });
+        t.start();
+
+        Thread.sleep(1500L);
+        
+        journalCountRef.set(1);
+        t.join();
+        
+        final int threadMillis = (int) 
TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
+        assertTrue(threadMillis > 1200);       // use 1200 to account for the 
fact that the timing is not exact
+        
+        builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
+        attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
+        repo.registerEvent(builder.build());
+    }
+    
+    
+    // TODO: test EOF on merge
+    // TODO: Test journal with no records
 
     @Test
     public void testTextualQuery() throws InterruptedException, IOException, 
ParseException {

Reply via email to