NIFI-527: Merging develop

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5ac48a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5ac48a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5ac48a0

Branch: refs/heads/develop
Commit: a5ac48a03c362dcb0b253741157d79e8791eb2d5
Parents: f442d55
Author: Mark Payne <[email protected]>
Authored: Mon Apr 27 09:52:33 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Apr 27 09:52:33 2015 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 592 ++++++++++---------
 .../provenance/RepositoryConfiguration.java     |  14 +-
 .../nifi/provenance/StandardRecordReader.java   | 223 ++++++-
 .../nifi/provenance/StandardRecordWriter.java   | 114 +++-
 .../provenance/lucene/DeleteIndexAction.java    |  75 +--
 .../nifi/provenance/lucene/DocsReader.java      | 100 +++-
 .../nifi/provenance/lucene/FieldNames.java      |   1 +
 .../nifi/provenance/lucene/IndexManager.java    | 467 +++++++++++++++
 .../nifi/provenance/lucene/IndexSearch.java     |  71 ++-
 .../nifi/provenance/lucene/IndexingAction.java  | 183 +++---
 .../nifi/provenance/lucene/LuceneUtil.java      |  26 +-
 .../provenance/serialization/RecordReader.java  |  67 +++
 .../provenance/serialization/RecordReaders.java | 139 ++---
 .../provenance/serialization/RecordWriter.java  |   6 +
 .../provenance/serialization/RecordWriters.java |  13 +-
 .../nifi/provenance/toc/StandardTocReader.java  | 108 ++++
 .../nifi/provenance/toc/StandardTocWriter.java  | 120 ++++
 .../apache/nifi/provenance/toc/TocReader.java   |  58 ++
 .../org/apache/nifi/provenance/toc/TocUtil.java |  37 ++
 .../apache/nifi/provenance/toc/TocWriter.java   |  52 ++
 .../TestPersistentProvenanceRepository.java     | 175 +++---
 .../TestStandardRecordReaderWriter.java         | 189 ++++++
 .../org/apache/nifi/provenance/TestUtil.java    |  82 +++
 .../provenance/toc/TestStandardTocReader.java   |  91 +++
 .../provenance/toc/TestStandardTocWriter.java   |  42 ++
 25 files changed, 2426 insertions(+), 619 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 0502cc7..48cc164 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -58,6 +57,14 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexNotFoundException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
@@ -67,12 +74,11 @@ import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageComputationType;
 import org.apache.nifi.provenance.lucene.DeleteIndexAction;
 import org.apache.nifi.provenance.lucene.FieldNames;
+import org.apache.nifi.provenance.lucene.IndexManager;
 import org.apache.nifi.provenance.lucene.IndexSearch;
 import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.lucene.LineageQuery;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
-import org.apache.nifi.provenance.rollover.CompressionAction;
-import org.apache.nifi.provenance.rollover.RolloverAction;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
 import org.apache.nifi.provenance.search.QuerySubmission;
@@ -81,18 +87,12 @@ import 
org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriters;
+import org.apache.nifi.provenance.toc.TocUtil;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.StopWatch;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexNotFoundException;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +102,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     public static final String EVENT_CATEGORY = "Provenance Repository";
     private static final String FILE_EXTENSION = ".prov";
     private static final String TEMP_FILE_SUFFIX = ".prov.part";
-    public static final int SERIALIZATION_VERSION = 7;
+    public static final int SERIALIZATION_VERSION = 8;
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
     public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
     public static final Pattern LOG_FILENAME_PATTERN = 
Pattern.compile("(\\d+).*\\.prov");
@@ -129,14 +129,14 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     private final AtomicLong streamStartTime = new 
AtomicLong(System.currentTimeMillis());
     private final RepositoryConfiguration configuration;
     private final IndexConfiguration indexConfig;
+    private final IndexManager indexManager;
     private final boolean alwaysSync;
     private final int rolloverCheckMillis;
 
     private final ScheduledExecutorService scheduledExecService;
-    private final ExecutorService rolloverExecutor;
+    private final ScheduledExecutorService rolloverExecutor;
     private final ExecutorService queryExecService;
 
-    private final List<RolloverAction> rolloverActions = new ArrayList<>();
     private final List<ExpirationAction> expirationActions = new ArrayList<>();
 
     private final IndexingAction indexingAction;
@@ -181,22 +181,18 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         this.maxPartitionMillis = 
configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
         this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
         this.indexConfig = new IndexConfiguration(configuration);
+        this.indexManager = new IndexManager();
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
         
         final List<SearchableField> fields = 
configuration.getSearchableFields();
         if (fields != null && !fields.isEmpty()) {
             indexingAction = new IndexingAction(this, indexConfig);
-            rolloverActions.add(indexingAction);
         } else {
             indexingAction = null;
         }
 
-        if (configuration.isCompressOnRollover()) {
-            rolloverActions.add(new CompressionAction());
-        }
-
-        scheduledExecService = Executors.newScheduledThreadPool(3);
+        scheduledExecService = Executors.newScheduledThreadPool(3, new 
NamedThreadFactory("Provenance Maintenance Thread"));
         queryExecService = 
Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new 
NamedThreadFactory("Provenance Query Thread"));
 
         // The number of rollover threads is a little bit arbitrary but comes 
from the idea that multiple storage directories generally
@@ -204,69 +200,74 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         // disks efficiently. However, the rollover actions can be somewhat 
CPU intensive, so we double the number of threads in order
         // to account for that.
         final int numRolloverThreads = 
configuration.getStorageDirectories().size() * 2;
-        rolloverExecutor = Executors.newFixedThreadPool(numRolloverThreads, 
new NamedThreadFactory("Provenance Repository Rollover Thread"));
+        rolloverExecutor = 
Executors.newScheduledThreadPool(numRolloverThreads, new 
NamedThreadFactory("Provenance Repository Rollover Thread"));
     }
 
     @Override
     public void initialize(final EventReporter eventReporter) throws 
IOException {
-        if (initialized.getAndSet(true)) {
-            return;
-        }
-
-        this.eventReporter = eventReporter;
-
-        recover();
-
-        if (configuration.isAllowRollover()) {
-            writers = createWriters(configuration, idGenerator.get());
-        }
-
-        if (configuration.isAllowRollover()) {
-            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-                @Override
-                public void run() {
-                    // Check if we need to roll over
-                    if (needToRollover()) {
-                        // it appears that we do need to roll over. Obtain 
write lock so that we can do so, and then
-                        // confirm that we still need to.
-                        writeLock.lock();
-                        try {
-                            logger.debug("Obtained write lock to perform 
periodic rollover");
-
-                            if (needToRollover()) {
-                                try {
-                                    rollover(false);
-                                } catch (final Exception e) {
-                                    logger.error("Failed to roll over 
Provenance Event Log due to {}", e.toString());
-                                    logger.error("", e);
-                                }
-                            }
-                        } finally {
-                            writeLock.unlock();
-                        }
-                    }
-                }
-            }, rolloverCheckMillis, rolloverCheckMillis, 
TimeUnit.MILLISECONDS);
-
-            scheduledExecService.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
-            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        purgeOldEvents();
-                    } catch (final Exception e) {
-                        logger.error("Failed to purge old events from 
Provenance Repo due to {}", e.toString());
-                        if (logger.isDebugEnabled()) {
-                            logger.error("", e);
-                        }
-                        eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + 
e.toString());
-                    }
-                }
-            }, 1L, 1L, TimeUnit.MINUTES);
-
-            expirationActions.add(new DeleteIndexAction(this, indexConfig));
-            expirationActions.add(new FileRemovalAction());
-        }
+       writeLock.lock();
+       try {
+               if (initialized.getAndSet(true)) {
+                   return;
+               }
+       
+               this.eventReporter = eventReporter;
+       
+               recover();
+       
+               if (configuration.isAllowRollover()) {
+                   writers = createWriters(configuration, idGenerator.get());
+               }
+       
+               if (configuration.isAllowRollover()) {
+                   scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                           // Check if we need to roll over
+                           if (needToRollover()) {
+                               // it appears that we do need to roll over. 
Obtain write lock so that we can do so, and then
+                               // confirm that we still need to.
+                               writeLock.lock();
+                               try {
+                                   logger.debug("Obtained write lock to 
perform periodic rollover");
+       
+                                   if (needToRollover()) {
+                                       try {
+                                           rollover(false);
+                                       } catch (final Exception e) {
+                                           logger.error("Failed to roll over 
Provenance Event Log due to {}", e.toString());
+                                           logger.error("", e);
+                                       }
+                                   }
+                               } finally {
+                                   writeLock.unlock();
+                               }
+                           }
+                       }
+                   }, rolloverCheckMillis, rolloverCheckMillis, 
TimeUnit.MILLISECONDS);
+       
+                   scheduledExecService.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
+                   scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                           try {
+                               purgeOldEvents();
+                           } catch (final Exception e) {
+                               logger.error("Failed to purge old events from 
Provenance Repo due to {}", e.toString());
+                               if (logger.isDebugEnabled()) {
+                                   logger.error("", e);
+                               }
+                               eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + 
e.toString());
+                           }
+                       }
+                   }, 1L, 1L, TimeUnit.MINUTES);
+       
+                   expirationActions.add(new DeleteIndexAction(this, 
indexConfig, indexManager));
+                   expirationActions.add(new FileRemovalAction());
+               }
+       } finally {
+               writeLock.unlock();
+       }
     }
 
     private static RepositoryConfiguration createRepositoryConfiguration() 
throws IOException {
@@ -334,10 +335,11 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             final File journalDirectory = new File(storageDirectory, 
"journals");
             final File journalFile = new File(journalDirectory, 
String.valueOf(initialRecordId) + ".journal." + i);
 
-            writers[i] = RecordWriters.newRecordWriter(journalFile);
+            writers[i] = RecordWriters.newRecordWriter(journalFile, false, 
false);
             writers[i].writeHeader();
         }
 
+        logger.info("Created new Provenance Event Writers for events starting 
with ID {}", initialRecordId);
         return writers;
     }
 
@@ -501,18 +503,15 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
             // Determine the max ID in the last file.
             try (final RecordReader reader = 
RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
-                ProvenanceEventRecord record;
-                while ((record = reader.nextRecord()) != null) {
-                    final long eventId = record.getEventId();
-                    if (eventId > maxId) {
-                        maxId = eventId;
-                    }
+               final long eventId = reader.getMaxEventId();
+                if (eventId > maxId) {
+                    maxId = eventId;
+                }
 
-                    // If the ID is greater than the max indexed id and this 
file was indexed, then
-                    // update the max indexed id
-                    if (eventId > maxIndexedId && lastFileIndexed) {
-                        maxIndexedId = eventId;
-                    }
+                // If the ID is greater than the max indexed id and this file 
was indexed, then
+                // update the max indexed id
+                if (eventId > maxIndexedId && lastFileIndexed) {
+                    maxIndexedId = eventId;
                 }
             } catch (final IOException ioe) {
                 logger.error("Failed to read Provenance Event File {} due to 
{}", maxIdFile, ioe);
@@ -568,16 +567,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             // Read the records in the last file to find its max id
             if (greatestMinIdFile != null) {
                 try (final RecordReader recordReader = 
RecordReaders.newRecordReader(greatestMinIdFile, 
Collections.<Path>emptyList())) {
-                    StandardProvenanceEventRecord record;
-
-                    try {
-                        while ((record = recordReader.nextRecord()) != null) {
-                            if (record.getEventId() > maxId) {
-                                maxId = record.getEventId();
-                            }
-                        }
-                    } catch (final EOFException eof) {
-                    }
+                       maxId = recordReader.getMaxEventId();
                 }
             }
 
@@ -599,46 +589,11 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
 
         logger.info("Recovered {} records", recordsRecovered);
-
-        final List<RolloverAction> rolloverActions = this.rolloverActions;
-        final Runnable retroactiveRollover = new Runnable() {
-            @Override
-            public void run() {
-                for (File toRecover : filesToRecover) {
-                    final String baseFileName = 
LuceneUtil.substringBefore(toRecover.getName(), ".");
-                    final Long fileFirstEventId = Long.parseLong(baseFileName);
-
-                    for (final RolloverAction action : rolloverActions) {
-                        if (!action.hasBeenPerformed(toRecover)) {
-                            try {
-                                final StopWatch stopWatch = new 
StopWatch(true);
-
-                                toRecover = action.execute(toRecover);
-
-                                stopWatch.stop();
-                                final String duration = 
stopWatch.getDuration();
-                                logger.info("Successfully performed 
retroactive action {} against {} in {}", action, toRecover, duration);
-
-                                // update our map of id to Path
-                                final Map<Long, Path> updatedMap = 
addToPathMap(fileFirstEventId, toRecover.toPath());
-                                logger.trace("After retroactive rollover 
action {}, Path Map: {}", action, updatedMap);
-                            } catch (final Exception e) {
-                                logger.error("Failed to perform retroactive 
rollover actions on {} due to {}", toRecover, e.toString());
-                                logger.error("", e);
-                                eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to perform retroactive rollover actions on " + 
toRecover + " due to " + e.toString());
-                            }
-                        }
-                    }
-                }
-            }
-        };
-        rolloverExecutor.submit(retroactiveRollover);
-
         recoveryFinished.set(true);
     }
 
     @Override
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
         writeLock.lock();
         try {
             logger.debug("Obtained write lock for close");
@@ -648,8 +603,12 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             rolloverExecutor.shutdownNow();
             queryExecService.shutdownNow();
 
-            for (final RecordWriter writer : writers) {
-                writer.close();
+            indexManager.close();
+            
+            if ( writers != null ) {
+                   for (final RecordWriter writer : writers) {
+                       writer.close();
+                   }
             }
         } finally {
             writeLock.unlock();
@@ -945,6 +904,21 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
     }
 
+    // made protected for testing purposes
+    protected int getJournalCount() {
+       // determine how many 'journals' we have in the journals directories
+        int journalFileCount = 0;
+        for ( final File storageDir : configuration.getStorageDirectories() ) {
+               final File journalsDir = new File(storageDir, "journals");
+               final File[] journalFiles = journalsDir.listFiles();
+               if ( journalFiles != null ) {
+                       journalFileCount += journalFiles.length;
+               }
+        }
+        
+        return journalFileCount;
+    }
+    
     /**
      * MUST be called with the write lock held
      *
@@ -963,9 +937,45 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             for (final RecordWriter writer : writers) {
                 final File writerFile = writer.getFile();
                 journalsToMerge.add(writerFile);
-                writer.close();
+                try {
+                       writer.close();
+                } catch (final IOException ioe) {
+                       logger.warn("Failed to close {} due to {}", writer, 
ioe.toString());
+                       if ( logger.isDebugEnabled() ) {
+                               logger.warn("", ioe);
+                       }
+                }
+            }
+            if ( logger.isDebugEnabled() ) {
+               logger.debug("Going to merge {} files for journals starting 
with ID {}", journalsToMerge.size(), 
LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
+            int journalFileCount = getJournalCount();
+            final int journalCountThreshold = configuration.getJournalCount() 
* 5;
+            if ( journalFileCount > journalCountThreshold ) {
+               logger.warn("The rate of the dataflow is exceeding the 
provenance recording rate. "
+                               + "Slowing down flow to accomodate. Currently, 
there are {} journal files and "
+                               + "threshold for blocking is {}", 
journalFileCount, journalCountThreshold);
+               eventReporter.reportEvent(Severity.WARNING, "Provenance 
Repository", "The rate of the dataflow is "
+                               + "exceeding the provenance recording 
rate. Slowing down flow to accomodate");
+               
+               while (journalFileCount > journalCountThreshold) {
+                       try {
+                               Thread.sleep(1000L);
+                       } catch (final InterruptedException ie) {
+                       }
+                       
+                       logger.debug("Provenance Repository is still behind. 
Keeping flow slowed down "
+                                       + "to accomodate. Currently, there are 
{} journal files and "
+                                       + "threshold for blocking is {}", 
journalFileCount, journalCountThreshold);
+
+                       journalFileCount = getJournalCount();
+               }
+               
+               logger.info("Provenance Repository has no caught up with 
rolling over journal files. Current number of "
+                               + "journal files to be rolled over is {}", 
journalFileCount);
+            }
+            
             writers = createWriters(configuration, idGenerator.get());
             streamStartTime.set(System.currentTimeMillis());
             recordsWrittenSinceRollover.getAndSet(0);
@@ -974,60 +984,29 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             final List<File> storageDirs = 
configuration.getStorageDirectories();
             final File storageDir = storageDirs.get((int) (storageDirIdx % 
storageDirs.size()));
 
-            final List<RolloverAction> actions = rolloverActions;
+            final AtomicReference<Future<?>> futureReference = new 
AtomicReference<>();
             final int recordsWritten = 
recordsWrittenSinceRollover.getAndSet(0);
             final Runnable rolloverRunnable = new Runnable() {
                 @Override
                 public void run() {
-                    final File fileRolledOver;
-
-                    try {
-                        fileRolledOver = mergeJournals(journalsToMerge, 
storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, 
latestRecords);
-                        repoDirty.set(false);
-                    } catch (final IOException ioe) {
-                        repoDirty.set(true);
-                        logger.error("Failed to merge Journal Files {} into a 
Provenance Log File due to {}", journalsToMerge, ioe.toString());
-                        logger.error("", ioe);
-                        return;
-                    }
-
-                    if (fileRolledOver == null) {
-                        return;
-                    }
-                    File file = fileRolledOver;
-
-                    for (final RolloverAction action : actions) {
-                        try {
-                            final StopWatch stopWatch = new StopWatch(true);
-                            file = action.execute(file);
-                            stopWatch.stop();
-                            logger.info("Successfully performed Rollover 
Action {} for {} in {}", action, file, stopWatch.getDuration());
-
-                            // update our map of id to Path
-                            // need lock to update the map, even though it's 
an AtomicReference, AtomicReference allows those doing a
-                            // get() to obtain the most up-to-date version but 
we use a writeLock to prevent multiple threads modifying
-                            // it at one time
-                            writeLock.lock();
-                            try {
-                                final Long fileFirstEventId = 
Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
-                                SortedMap<Long, Path> newIdToPathMap = new 
TreeMap<>(new PathMapComparator());
-                                newIdToPathMap.putAll(idToPathMap.get());
-                                newIdToPathMap.put(fileFirstEventId, 
file.toPath());
-                                idToPathMap.set(newIdToPathMap);
-                                logger.trace("After rollover action {}, path 
map: {}", action, newIdToPathMap);
-                            } finally {
-                                writeLock.unlock();
-                            }
-                        } catch (final Throwable t) {
-                            logger.error("Failed to perform Rollover Action {} 
for {}: got Exception {}",
-                                    action, fileRolledOver, t.toString());
-                            logger.error("", t);
-
-                            return;
-                        }
-                    }
-
-                    if (actions.isEmpty()) {
+                       try {
+                           final File fileRolledOver;
+       
+                           try {
+                               fileRolledOver = mergeJournals(journalsToMerge, 
storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, 
latestRecords);
+                               repoDirty.set(false);
+                           } catch (final IOException ioe) {
+                               repoDirty.set(true);
+                               logger.error("Failed to merge Journal Files {} 
into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+                               logger.error("", ioe);
+                               return;
+                           }
+       
+                           if (fileRolledOver == null) {
+                               return;
+                           }
+                           File file = fileRolledOver;
+       
                         // update our map of id to Path
                         // need lock to update the map, even though it's an 
AtomicReference, AtomicReference allows those doing a
                         // get() to obtain the most up-to-date version but we 
use a writeLock to prevent multiple threads modifying
@@ -1042,35 +1021,37 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         } finally {
                             writeLock.unlock();
                         }
-                    }
-
-                    logger.info("Successfully Rolled over Provenance Event 
file containing {} records", recordsWritten);
-                    rolloverCompletions.getAndIncrement();
+       
+                           logger.info("Successfully Rolled over Provenance 
Event file containing {} records", recordsWritten);
+                           rolloverCompletions.getAndIncrement();
+                           
+                           // We have finished successfully. Cancel the future 
so that we don't run anymore
+                           Future<?> future;
+                           while ((future = futureReference.get()) == null) {
+                               try {
+                                       Thread.sleep(10L);
+                               } catch (final InterruptedException ie) {
+                               }
+                           }
+                           
+                           future.cancel(false);
+                       } catch (final Throwable t) {
+                               logger.error("Failed to rollover Provenance 
repository due to {}", t.toString());
+                               logger.error("", t);
+                       }
                 }
             };
 
-            rolloverExecutor.submit(rolloverRunnable);
+            // We are going to schedule the future to run every 10 seconds. 
This allows us to keep retrying if we
+            // fail for some reason. When we succeed, the Runnable will cancel 
itself.
+            final Future<?> future = 
rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, 
TimeUnit.SECONDS);
+            futureReference.set(future);
 
             streamStartTime.set(System.currentTimeMillis());
             bytesWrittenSinceRollover.set(0);
         }
     }
 
-    private SortedMap<Long, Path> addToPathMap(final Long firstEventId, final 
Path path) {
-        SortedMap<Long, Path> unmodifiableMap;
-        boolean updated = false;
-        do {
-            final SortedMap<Long, Path> existingMap = idToPathMap.get();
-            final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new 
PathMapComparator());
-            newIdToPathMap.putAll(existingMap);
-            newIdToPathMap.put(firstEventId, path);
-            unmodifiableMap = 
Collections.unmodifiableSortedMap(newIdToPathMap);
-
-            updated = idToPathMap.compareAndSet(existingMap, unmodifiableMap);
-        } while (!updated);
-
-        return unmodifiableMap;
-    }
 
     private Set<File> recoverJournalFiles() throws IOException {
         if (!configuration.isAllowRollover()) {
@@ -1093,6 +1074,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
 
             for (final File journalFile : journalFiles) {
+               if ( journalFile.isDirectory() ) {
+                       continue;
+               }
+               
                 final String basename = 
LuceneUtil.substringBefore(journalFile.getName(), ".");
                 List<File> files = journalMap.get(basename);
                 if (files == null) {
@@ -1135,22 +1120,92 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return mergedFile;
     }
 
-    static File mergeJournals(final List<File> journalFiles, final File 
storageDir, final File mergedFile, final EventReporter eventReporter, final 
RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
-        final long startNanos = System.nanoTime();
+    File mergeJournals(final List<File> journalFiles, final File storageDir, 
final File mergedFile, final EventReporter eventReporter, final 
RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
+       logger.debug("Merging {} to {}", journalFiles, mergedFile);
+       if ( this.closed ) {
+               logger.info("Provenance Repository has been closed; will not 
merge journal files to {}", mergedFile);
+               return null;
+       }
+       
         if (journalFiles.isEmpty()) {
             return null;
         }
 
-        if (mergedFile.exists()) {
-            throw new FileAlreadyExistsException("Cannot Merge " + 
journalFiles.size() + " Journal Files into Merged Provenance Log File " + 
mergedFile.getAbsolutePath() + " because the Merged File already exists");
+        Collections.sort(journalFiles, new Comparator<File>() {
+                       @Override
+                       public int compare(final File o1, final File o2) {
+                               final String suffix1 = 
LuceneUtil.substringAfterLast(o1.getName(), ".");
+                               final String suffix2 = 
LuceneUtil.substringAfterLast(o2.getName(), ".");
+
+                               try {
+                                       final int journalIndex1 = 
Integer.parseInt(suffix1);
+                                       final int journalIndex2 = 
Integer.parseInt(suffix2);
+                                       return Integer.compare(journalIndex1, 
journalIndex2);
+                               } catch (final NumberFormatException nfe) {
+                                       return 
o1.getName().compareTo(o2.getName());
+                               }
+                       }
+        });
+        
+        final String firstJournalFile = journalFiles.get(0).getName();
+        final String firstFileSuffix = 
LuceneUtil.substringAfterLast(firstJournalFile, ".");
+        final boolean allPartialFiles = firstFileSuffix.equals("0");
+        
+        // check if we have all of the "partial" files for the journal.
+        if (allPartialFiles) {
+               if ( mergedFile.exists() ) {
+                       // we have all "partial" files and there is already a 
merged file. Delete the data from the index
+                       // because the merge file may not be fully merged. We 
will re-merge.
+                       logger.warn("Merged Journal File {} already exists; 
however, all partial journal files also exist "
+                                       + "so assuming that the merge did not 
finish. Repeating procedure in order to ensure consistency.");
+                       
+                       final DeleteIndexAction deleteAction = new 
DeleteIndexAction(this, indexConfig, indexManager);
+                       try {
+                               deleteAction.execute(mergedFile);
+                       } catch (final Exception e) {
+                               logger.warn("Failed to delete records from 
Journal File {} from the index; this could potentially result in duplicates. 
Failure was due to {}", mergedFile, e.toString());
+                               if ( logger.isDebugEnabled() ) {
+                                       logger.warn("", e);
+                               }
+                       }
+
+                       // Since we only store the file's basename, block 
offset, and event ID, and because the newly created file could end up on
+                       // a different Storage Directory than the original, we 
need to ensure that we delete both the partially merged
+                       // file and the TOC file. Otherwise, we could get the 
wrong copy and have issues retrieving events.
+                       if ( !mergedFile.delete() ) {
+                               logger.error("Failed to delete partially 
written Provenance Journal File {}. This may result in events from this journal 
"
+                                               + "file not being able to be 
displayed. This file should be deleted manually.", mergedFile);
+                       }
+                       
+                       final File tocFile = TocUtil.getTocFile(mergedFile);
+                       if ( tocFile.exists() && !tocFile.delete() ) {
+                               logger.error("Failed to delete .toc file {}; 
this may result in not being able to read the Provenance Events from the {} 
Journal File. "
+                                               + "This can be corrected by 
manually deleting the {} file", tocFile, mergedFile, tocFile);
+                       }
+               }
+        } else {
+               logger.warn("Cannot merge journal files {} because expected 
first file to end with extension '.0' "
+                               + "but it did not; assuming that the files were 
already merged but only some finished deletion "
+                               + "before restart. Deleting remaining partial 
journal files.", journalFiles);
+               
+               for ( final File file : journalFiles ) {
+                       if ( !file.delete() && file.exists() ) {
+                               logger.warn("Failed to delete unneeded journal 
file {}; this file should be cleaned up manually", file);
+                       }
+               }
+               
+               return null;
         }
-
-        final File tempMergedFile = new File(mergedFile.getParentFile(), 
mergedFile.getName() + ".part");
+        
+        final long startNanos = System.nanoTime();
 
         // Map each journal to a RecordReader
         final List<RecordReader> readers = new ArrayList<>();
         int records = 0;
 
+        final boolean isCompress = configuration.isCompressOnRollover();
+        final File writerFile = isCompress ? new 
File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile;
+
         try {
             for (final File journalFile : journalFiles) {
                 try {
@@ -1203,32 +1258,50 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
             // loop over each entry in the map, persisting the records to the 
merged file in order, and populating the map
             // with the next entry from the journal file from which the 
previous record was written.
-            try (final RecordWriter writer = 
RecordWriters.newRecordWriter(tempMergedFile)) {
+            try (final RecordWriter writer = 
RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), 
true)) {
                 writer.writeHeader();
 
-                while (!recordToReaderMap.isEmpty()) {
-                    final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-                    final StandardProvenanceEventRecord record = 
entry.getKey();
-                    final RecordReader reader = entry.getValue();
-
-                    writer.writeRecord(record, record.getEventId());
-                    ringBuffer.add(record);
-                    records++;
-
-                    // Remove this entry from the map
-                    recordToReaderMap.remove(record);
-
-                    // Get the next entry from this reader and add it to the 
map
-                    StandardProvenanceEventRecord nextRecord = null;
-
-                    try {
-                        nextRecord = reader.nextRecord();
-                    } catch (final EOFException eof) {
-                    }
-
-                    if (nextRecord != null) {
-                        recordToReaderMap.put(nextRecord, reader);
-                    }
+                final IndexingAction indexingAction = new IndexingAction(this, 
indexConfig);
+                
+                final File indexingDirectory = 
indexConfig.getWritableIndexDirectory(writerFile);
+                final IndexWriter indexWriter = 
indexManager.borrowIndexWriter(indexingDirectory);
+                try {
+                       long maxId = 0L;
+                       
+                       while (!recordToReaderMap.isEmpty()) {
+                           final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+                           final StandardProvenanceEventRecord record = 
entry.getKey();
+                           final RecordReader reader = entry.getValue();
+       
+                           writer.writeRecord(record, record.getEventId());
+                           final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();
+                           
+                           indexingAction.index(record, indexWriter, 
blockIndex);
+                           maxId = record.getEventId();
+                           
+                           ringBuffer.add(record);
+                           records++;
+       
+                           // Remove this entry from the map
+                           recordToReaderMap.remove(record);
+       
+                           // Get the next entry from this reader and add it 
to the map
+                           StandardProvenanceEventRecord nextRecord = null;
+       
+                           try {
+                               nextRecord = reader.nextRecord();
+                           } catch (final EOFException eof) {
+                           }
+       
+                           if (nextRecord != null) {
+                               recordToReaderMap.put(nextRecord, reader);
+                           }
+                       }
+                       
+                       indexWriter.commit();
+                       indexConfig.setMaxIdIndexed(maxId);
+                } finally {
+                       indexManager.returnIndexWriter(indexingDirectory, 
indexWriter);
                 }
             }
         } finally {
@@ -1240,37 +1313,22 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
         }
 
-        // Attempt to rename. Keep trying for a bit if we fail. This happens 
often if we have some external process
-        // that locks files, such as a virus scanner.
-        boolean renamed = false;
-        for (int i = 0; i < 10 && !renamed; i++) {
-            renamed = tempMergedFile.renameTo(mergedFile);
-            if (!renamed) {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException ie) {
-                }
-            }
-        }
-
-        if (!renamed) {
-            throw new IOException("Failed to merge journal files into single 
merged file " + mergedFile.getAbsolutePath() + " because " + 
tempMergedFile.getAbsolutePath() + " could not be renamed");
-        }
-
         // Success. Remove all of the journal files, as they're no longer 
needed, now that they've been merged.
         for (final File journalFile : journalFiles) {
-            if (!journalFile.delete()) {
-                if (journalFile.exists()) {
-                    logger.warn("Failed to remove temporary journal file {}; 
this file should be cleaned up manually", journalFile.getAbsolutePath());
-                    eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to remove temporary journal file " + 
journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
-                } else {
-                    logger.warn("Failed to remove temporary journal file {} 
because it no longer exists", journalFile.getAbsolutePath());
-                }
+            if (!journalFile.delete() && journalFile.exists()) {
+                logger.warn("Failed to remove temporary journal file {}; this 
file should be cleaned up manually", journalFile.getAbsolutePath());
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; 
this file should be cleaned up manually");
+            }
+            
+            final File tocFile = TocUtil.getTocFile(journalFile);
+            if (!tocFile.delete() && tocFile.exists()) {
+                logger.warn("Failed to remove temporary journal TOC file {}; 
this file should be cleaned up manually", tocFile.getAbsolutePath());
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; 
this file should be cleaned up manually");
             }
         }
 
         if (records == 0) {
-            mergedFile.delete();
+            writerFile.delete();
             return null;
         } else {
             final long nanos = System.nanoTime() - startNanos;
@@ -1278,7 +1336,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             logger.info("Successfully merged {} journal files ({} records) 
into single Provenance Log File {} in {} milliseconds", journalFiles.size(), 
records, mergedFile, millis);
         }
 
-        return mergedFile;
+        return writerFile;
     }
 
     @Override
@@ -1779,7 +1837,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         @Override
         public void run() {
             try {
-                final IndexSearch search = new 
IndexSearch(PersistentProvenanceRepository.this, indexDir);
+                final IndexSearch search = new 
IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager);
                 final StandardQueryResult queryResult = search.search(query, 
retrievalCount);
                 submission.getResult().update(queryResult.getMatchingEvents(), 
queryResult.getTotalHitCount());
                 if (queryResult.isFinished()) {
@@ -1787,7 +1845,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                             query, indexDir, queryResult.getQueryTime(), 
queryResult.getTotalHitCount());
                 }
             } catch (final Throwable t) {
-                logger.error("Failed to query provenance repository due to 
{}", t.toString());
+                logger.error("Failed to query Provenance Repository Index {} 
due to {}", indexDir, t.toString());
                 if (logger.isDebugEnabled()) {
                     logger.error("", t);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index d47df4f..3951591 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -33,7 +33,8 @@ public class RepositoryConfiguration {
     private long eventFileBytes = 1024L * 1024L * 5L;   // 5 MB
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int journalCount = 16;
-
+    private int compressionBlockBytes = 1024 * 1024;
+    
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
@@ -49,7 +50,16 @@ public class RepositoryConfiguration {
         return allowRollover;
     }
 
-    /**
+    
+    public int getCompressionBlockBytes() {
+               return compressionBlockBytes;
+       }
+
+       public void setCompressionBlockBytes(int compressionBlockBytes) {
+               this.compressionBlockBytes = compressionBlockBytes;
+       }
+
+       /**
      * Specifies where the repository will store data
      *
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 5e4744b..9bbf195 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -17,41 +17,173 @@
 package org.apache.nifi.provenance;
 
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.zip.GZIPInputStream;
 
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.provenance.serialization.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandardRecordReader implements RecordReader {
-
-    private final DataInputStream dis;
-    private final ByteCountingInputStream byteCountingIn;
+       private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordReader.class);
+       
+       private final ByteCountingInputStream rawInputStream;
     private final String filename;
     private final int serializationVersion;
+    private final boolean compressed;
+    private final TocReader tocReader;
+    private final int headerLength;
+    
+    private DataInputStream dis;
+    private ByteCountingInputStream byteCountingIn;
+
+    public StandardRecordReader(final InputStream in, final String filename) 
throws IOException {
+       this(in, filename, null);
+    }
+    
+    public StandardRecordReader(final InputStream in, final String filename, 
final TocReader tocReader) throws IOException {
+       logger.trace("Creating RecordReader for {}", filename);
+       
+       rawInputStream = new ByteCountingInputStream(in);
+
+        final InputStream limitedStream;
+        if ( tocReader == null ) {
+               limitedStream = rawInputStream;
+        } else {
+               final long offset1 = tocReader.getBlockOffset(1);
+               if ( offset1 < 0 ) {
+                       limitedStream = rawInputStream;
+               } else {
+                       limitedStream = new LimitingInputStream(rawInputStream, 
offset1 - rawInputStream.getBytesConsumed());
+               }
+        }
+        
+       final InputStream readableStream;
+        if (filename.endsWith(".gz")) {
+            readableStream = new BufferedInputStream(new 
GZIPInputStream(limitedStream));
+            compressed = true;
+        } else {
+            readableStream = new BufferedInputStream(limitedStream);
+            compressed = false;
+        }
 
-    public StandardRecordReader(final InputStream in, final int 
serializationVersion, final String filename) {
-        if (serializationVersion < 1 || serializationVersion > 7) {
-            throw new IllegalArgumentException("Unable to deserialize record 
because the version is " + serializationVersion + " and supported versions are 
1-6");
+        byteCountingIn = new ByteCountingInputStream(readableStream);
+        dis = new DataInputStream(byteCountingIn);
+        
+        final String repoClassName = dis.readUTF();
+        final int serializationVersion = dis.readInt();
+        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 
2 + 4;  // 2 bytes for string length, 4 for integer.
+        
+        if (serializationVersion < 1 || serializationVersion > 8) {
+            throw new IllegalArgumentException("Unable to deserialize record 
because the version is " + serializationVersion + " and supported versions are 
1-8");
         }
 
-        byteCountingIn = new ByteCountingInputStream(in);
-        this.dis = new DataInputStream(byteCountingIn);
         this.serializationVersion = serializationVersion;
         this.filename = filename;
+        this.tocReader = tocReader;
+    }
+
+    @Override
+    public void skipToBlock(final int blockIndex) throws IOException {
+       if ( tocReader == null ) {
+               throw new IllegalStateException("Cannot skip to block " + 
blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents 
file was found for this Log");
+       }
+       
+       if ( blockIndex < 0 ) {
+               throw new IllegalArgumentException("Cannot skip to block " + 
blockIndex + " because the value is negative");
+       }
+       
+       if ( blockIndex == getBlockIndex() ) {
+               return;
+       }
+       
+       final long offset = tocReader.getBlockOffset(blockIndex);
+       if ( offset < 0 ) {
+               throw new IOException("Unable to find block " + blockIndex + " 
in Provenance Log " + filename);
+       }
+       
+       final long curOffset = rawInputStream.getBytesConsumed();
+       
+       final long bytesToSkip = offset - curOffset;
+       if ( bytesToSkip >= 0 ) {
+               try {
+                       StreamUtils.skip(rawInputStream, bytesToSkip);
+                       logger.debug("Skipped stream from offset {} to {} ({} 
bytes skipped)", curOffset, offset, bytesToSkip);
+               } catch (final IOException e) {
+                       throw new IOException("Failed to skip to offset " + 
offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+               }
+       
+               resetStreamForNextBlock();
+       }
     }
+    
+    private void resetStreamForNextBlock() throws IOException {
+       final InputStream limitedStream;
+        if ( tocReader == null ) {
+               limitedStream = rawInputStream;
+        } else {
+               final long offset = tocReader.getBlockOffset(1 + 
getBlockIndex());
+               if ( offset < 0 ) {
+                       limitedStream = rawInputStream;
+               } else {
+                       limitedStream = new LimitingInputStream(rawInputStream, 
offset - rawInputStream.getBytesConsumed());
+               }
+        }
+       
+       final InputStream readableStream;
+        if (compressed) {
+            readableStream = new BufferedInputStream(new 
GZIPInputStream(limitedStream));
+        } else {
+            readableStream = new BufferedInputStream(limitedStream);
+        }
 
+        byteCountingIn = new ByteCountingInputStream(readableStream, 
rawInputStream.getBytesConsumed());
+        dis = new DataInputStream(byteCountingIn);
+    }
+    
+    
+    @Override
+    public TocReader getTocReader() {
+       return tocReader;
+    }
+    
+    @Override
+    public boolean isBlockIndexAvailable() {
+       return tocReader != null;
+    }
+    
+    @Override
+    public int getBlockIndex() {
+       if ( tocReader == null ) {
+               throw new IllegalStateException("Cannot determine Block Index 
because no Table-of-Contents could be found for Provenance Log " + filename);
+       }
+       
+       return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+    }
+    
+    @Override
+    public long getBytesConsumed() {
+       return byteCountingIn.getBytesConsumed();
+    }
+    
     private StandardProvenanceEventRecord readPreVersion6Record() throws 
IOException {
         final long startOffset = byteCountingIn.getBytesConsumed();
 
-        if (!isData(byteCountingIn)) {
+        if (!isData()) {
             return null;
         }
 
@@ -137,7 +269,7 @@ public class StandardRecordReader implements RecordReader {
 
         final long startOffset = byteCountingIn.getBytesConsumed();
 
-        if (!isData(byteCountingIn)) {
+        if (!isData()) {
             return null;
         }
 
@@ -242,9 +374,17 @@ public class StandardRecordReader implements RecordReader {
     }
 
     private String readUUID(final DataInputStream in) throws IOException {
-        final long msb = in.readLong();
-        final long lsb = in.readLong();
-        return new UUID(msb, lsb).toString();
+       if ( serializationVersion < 8 ) {
+               final long msb = in.readLong();
+               final long lsb = in.readLong();
+               return new UUID(msb, lsb).toString();
+       } else {
+               // before version 8, we serialized UUID's as two longs in order 
to
+               // write less data. However, in version 8 we changed to just 
writing
+               // out the string because it's extremely expensive to call 
UUID.fromString.
+               // In the end, since we generally compress, the savings in 
minimal anyway.
+               return in.readUTF();
+       }
     }
 
     private String readNullableString(final DataInputStream in) throws 
IOException {
@@ -272,16 +412,58 @@ public class StandardRecordReader implements RecordReader 
{
         return new String(strBytes, "UTF-8");
     }
 
-    private boolean isData(final InputStream in) throws IOException {
-        in.mark(1);
-        final int nextByte = in.read();
-        in.reset();
+    private boolean isData() throws IOException {
+        byteCountingIn.mark(1);
+        int nextByte = byteCountingIn.read();
+        byteCountingIn.reset();
+        
+        if ( nextByte < 0 ) {
+               try {
+                       resetStreamForNextBlock();
+               } catch (final EOFException eof) {
+                       return false;
+               }
+               
+            byteCountingIn.mark(1);
+            nextByte = byteCountingIn.read();
+            byteCountingIn.reset();
+        }
+        
         return (nextByte >= 0);
     }
+    
+    @Override
+    public long getMaxEventId() throws IOException {
+       if ( tocReader != null ) {
+               final long lastBlockOffset = tocReader.getLastBlockOffset();
+               skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+       }
+       
+       ProvenanceEventRecord record;
+       ProvenanceEventRecord lastRecord = null;
+       try {
+               while ((record = nextRecord()) != null) {
+                       lastRecord = record;
+               }
+       } catch (final EOFException eof) {
+               // This can happen if we stop NIFi while the record is being 
written.
+               // This is OK, we just ignore this record. The session will not 
have been
+               // committed, so we can just process the FlowFile again.
+       }
+       
+       return (lastRecord == null) ? -1L : lastRecord.getEventId();
+    }
 
     @Override
     public void close() throws IOException {
+       logger.trace("Closing Record Reader for {}", filename);
+       
         dis.close();
+        rawInputStream.close();
+        
+        if ( tocReader != null ) {
+               tocReader.close();
+        }
     }
 
     @Override
@@ -291,7 +473,10 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipTo(final long position) throws IOException {
-        final long currentPosition = byteCountingIn.getBytesConsumed();
+       // we are subtracting headerLength from the number of bytes consumed 
because we used to 
+       // consider the offset of the first record "0" - now we consider it 
whatever position it
+       // it really is in the stream.
+        final long currentPosition = byteCountingIn.getBytesConsumed() - 
headerLength;
         if (currentPosition == position) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index df93084..dbb2c48 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -19,38 +19,54 @@ package org.apache.nifi.provenance;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.TocWriter;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandardRecordWriter implements RecordWriter {
-
+       private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordWriter.class);
+       
     private final File file;
-    private final DataOutputStream out;
-    private final ByteCountingOutputStream byteCountingOut;
     private final FileOutputStream fos;
+    private final ByteCountingOutputStream rawOutStream;
+    private final TocWriter tocWriter;
+    private final boolean compressed;
+    private final int uncompressedBlockSize;
+    
+    private DataOutputStream out;
+    private ByteCountingOutputStream byteCountingOut;
+    private long lastBlockOffset = 0L;
     private int recordCount = 0;
 
     private final Lock lock = new ReentrantLock();
 
-    public StandardRecordWriter(final File file) throws IOException {
+    
+    public StandardRecordWriter(final File file, final TocWriter writer, final 
boolean compressed, final int uncompressedBlockSize) throws IOException {
+       logger.trace("Creating Record Writer for {}", file.getName());
+       
         this.file = file;
+        this.compressed = compressed;
         this.fos = new FileOutputStream(file);
-        this.byteCountingOut = new ByteCountingOutputStream(new 
BufferedOutputStream(fos, 65536));
-        this.out = new DataOutputStream(byteCountingOut);
+        rawOutStream = new ByteCountingOutputStream(fos);
+        this.uncompressedBlockSize = uncompressedBlockSize;
+        
+        this.tocWriter = writer;
     }
 
     static void writeUUID(final DataOutputStream out, final String uuid) 
throws IOException {
-        final UUID uuidObj = UUID.fromString(uuid);
-        out.writeLong(uuidObj.getMostSignificantBits());
-        out.writeLong(uuidObj.getLeastSignificantBits());
+       out.writeUTF(uuid);
     }
 
     static void writeUUIDs(final DataOutputStream out, final 
Collection<String> list) throws IOException {
@@ -69,18 +85,67 @@ public class StandardRecordWriter implements RecordWriter {
         return file;
     }
 
-    @Override
+       @Override
     public synchronized void writeHeader() throws IOException {
+        lastBlockOffset = rawOutStream.getBytesWritten();
+        resetWriteStream();
+        
         out.writeUTF(PersistentProvenanceRepository.class.getName());
         out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
         out.flush();
     }
+    
+    private void resetWriteStream() throws IOException {
+       if ( out != null ) {
+               out.flush();
+       }
+
+       final long byteOffset = (byteCountingOut == null) ? 
rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+       
+       final OutputStream writableStream;
+       if ( compressed ) {
+               // because of the way that GZIPOutputStream works, we need to 
call close() on it in order for it
+               // to write its trailing bytes. But we don't want to close the 
underlying OutputStream, so we wrap
+               // the underlying OutputStream in a NonCloseableOutputStream
+               if ( out != null ) {
+                       out.close();
+               }
+
+               if ( tocWriter != null ) {
+                       
tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+               }
+
+               writableStream = new BufferedOutputStream(new 
GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+       } else {
+               if ( tocWriter != null ) {
+                       
tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+               }
+
+               writableStream = new BufferedOutputStream(rawOutStream, 65536);
+       }
+       
+        this.byteCountingOut = new ByteCountingOutputStream(writableStream, 
byteOffset);
+        this.out = new DataOutputStream(byteCountingOut);
+    }
+    
 
     @Override
     public synchronized long writeRecord(final ProvenanceEventRecord record, 
long recordIdentifier) throws IOException {
         final ProvenanceEventType recordType = record.getEventType();
         final long startBytes = byteCountingOut.getBytesWritten();
 
+        // add a new block to the TOC if needed.
+        if ( tocWriter != null && (startBytes - lastBlockOffset >= 
uncompressedBlockSize) ) {
+               lastBlockOffset = startBytes;
+               
+               if ( compressed ) {
+                       // because of the way that GZIPOutputStream works, we 
need to call close() on it in order for it
+                       // to write its trailing bytes. But we don't want to 
close the underlying OutputStream, so we wrap
+                       // the underlying OutputStream in a 
NonCloseableOutputStream
+                       resetWriteStream();
+               }
+        }
+        
         out.writeLong(recordIdentifier);
         out.writeUTF(record.getEventType().name());
         out.writeLong(record.getEventTime());
@@ -196,13 +261,24 @@ public class StandardRecordWriter implements RecordWriter 
{
 
     @Override
     public synchronized void close() throws IOException {
+       logger.trace("Closing Record Writer for {}", file.getName());
+       
         lock();
         try {
-            out.flush();
-            out.close();
+               try {
+                       out.flush();
+                       out.close();
+               } finally {
+                       rawOutStream.close();
+            
+                   if ( tocWriter != null ) {
+                       tocWriter.close();
+                   }
+               }
         } finally {
             unlock();
         }
+        
     }
 
     @Override
@@ -232,6 +308,14 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public void sync() throws IOException {
-        fos.getFD().sync();
+       if ( tocWriter != null ) {
+               tocWriter.sync();
+       }
+       fos.getFD().sync();
+    }
+    
+    @Override
+    public TocWriter getTocWriter() {
+       return tocWriter;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 4608419..7db04aa 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -16,25 +16,17 @@
  */
 package org.apache.nifi.provenance.lucene;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
 import org.apache.nifi.provenance.IndexConfiguration;
 import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,10 +35,12 @@ public class DeleteIndexAction implements ExpirationAction {
     private static final Logger logger = 
LoggerFactory.getLogger(DeleteIndexAction.class);
     private final PersistentProvenanceRepository repository;
     private final IndexConfiguration indexConfiguration;
+    private final IndexManager indexManager;
 
-    public DeleteIndexAction(final PersistentProvenanceRepository repo, final 
IndexConfiguration indexConfiguration) {
+    public DeleteIndexAction(final PersistentProvenanceRepository repo, final 
IndexConfiguration indexConfiguration, final IndexManager indexManager) {
         this.repository = repo;
         this.indexConfiguration = indexConfiguration;
+        this.indexManager = indexManager;
     }
 
     @Override
@@ -55,51 +49,38 @@ public class DeleteIndexAction implements ExpirationAction {
         long numDeleted = 0;
         long maxEventId = -1L;
         try (final RecordReader reader = 
RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
-            try {
-                StandardProvenanceEventRecord record;
-                while ((record = reader.nextRecord()) != null) {
-                    numDeleted++;
-
-                    if (record.getEventId() > maxEventId) {
-                        maxEventId = record.getEventId();
-                    }
-                }
-            } catch (final EOFException eof) {
-                // finished reading -- the last record was not completely 
written out, so it is discarded.
-            }
-        } catch (final EOFException eof) {
-            // no data in file.
-            return expiredFile;
+               maxEventId = reader.getMaxEventId();
+        } catch (final IOException ioe) {
+               logger.warn("Failed to obtain max ID present in journal file 
{}", expiredFile.getAbsolutePath());
         }
 
         // remove the records from the index
         final List<File> indexDirs = 
indexConfiguration.getIndexDirectories(expiredFile);
         for (final File indexingDirectory : indexDirs) {
-            try (final Directory directory = 
FSDirectory.open(indexingDirectory);
-                    final Analyzer analyzer = new StandardAnalyzer()) {
-                IndexWriterConfig config = new 
IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
-                config.setWriteLockTimeout(300000L);
-
-                Term term = new Term(FieldNames.STORAGE_FILENAME, 
LuceneUtil.substringBefore(expiredFile.getName(), "."));
+            final Term term = new Term(FieldNames.STORAGE_FILENAME, 
LuceneUtil.substringBefore(expiredFile.getName(), "."));
 
-                boolean deleteDir = false;
-                try (final IndexWriter indexWriter = new 
IndexWriter(directory, config)) {
-                    indexWriter.deleteDocuments(term);
-                    indexWriter.commit();
-                    final int docsLeft = indexWriter.numDocs();
-                    deleteDir = (docsLeft <= 0);
-                    logger.debug("After expiring {}, there are {} docs left 
for index {}", expiredFile, docsLeft, indexingDirectory);
-                }
+            boolean deleteDir = false;
+            final IndexWriter writer = 
indexManager.borrowIndexWriter(indexingDirectory);
+            try {
+                writer.deleteDocuments(term);
+                writer.commit();
+                final int docsLeft = writer.numDocs();
+                deleteDir = (docsLeft <= 0);
+                logger.debug("After expiring {}, there are {} docs left for 
index {}", expiredFile, docsLeft, indexingDirectory);
+            } finally {
+               indexManager.returnIndexWriter(indexingDirectory, writer);
+            }
 
-                // we've confirmed that all documents have been removed. 
Delete the index directory.
-                if (deleteDir) {
-                    indexConfiguration.removeIndexDirectory(indexingDirectory);
-                    deleteDirectory(indexingDirectory);
-                    logger.info("Removed empty index directory {}", 
indexingDirectory);
-                }
+            // we've confirmed that all documents have been removed. Delete 
the index directory.
+            if (deleteDir) {
+               indexManager.removeIndex(indexingDirectory);
+                indexConfiguration.removeIndexDirectory(indexingDirectory);
+                
+                deleteDirectory(indexingDirectory);
+                logger.info("Removed empty index directory {}", 
indexingDirectory);
             }
         }
-
+        
         // Update the minimum index to 1 more than the max Event ID in this 
file.
         if (maxEventId > -1L) {
             indexConfiguration.setMinIdIndexed(maxEventId + 1L);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 6446a35..5a77f42 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -23,23 +23,30 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
-
+import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DocsReader {
-
+       private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
+       
     public DocsReader(final List<File> storageDirectories) {
     }
 
@@ -48,6 +55,7 @@ public class DocsReader {
             return Collections.emptySet();
         }
 
+        final long start = System.nanoTime();
         final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
         final List<Document> docs = new ArrayList<>(numDocs);
 
@@ -60,63 +68,102 @@ public class DocsReader {
             }
         }
 
+        final long readDocuments = System.nanoTime() - start;
+        logger.debug("Reading {} Lucene Documents took {} millis", 
docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
         return read(docs, allProvenanceLogFiles);
     }
 
+    
+    private long getByteOffset(final Document d, final RecordReader reader) {
+        final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+        if ( blockField != null ) {
+               final int blockIndex = blockField.numericValue().intValue();
+               final TocReader tocReader = reader.getTocReader();
+               return tocReader.getBlockOffset(blockIndex);
+        }
+        
+       return 
d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
+    }
+    
+    
+    private ProvenanceEventRecord getRecord(final Document d, final 
RecordReader reader) throws IOException {
+       IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+       if ( blockField == null ) {
+               reader.skipTo(getByteOffset(d, reader));
+       } else {
+               reader.skipToBlock(blockField.numericValue().intValue());
+       }
+       
+        StandardProvenanceEventRecord record;
+        while ( (record = reader.nextRecord()) != null) {
+               IndexableField idField = 
d.getField(SearchableFields.Identifier.getSearchableFieldName());
+               if ( idField == null || idField.numericValue().longValue() == 
record.getEventId() ) {
+                       break;
+               }
+        }
+        
+        if ( record == null ) {
+               throw new IOException("Failed to find Provenance Event " + d);
+        } else {
+               return record;
+        }
+    }
+    
+
     public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
Collection<Path> allProvenanceLogFiles) throws IOException {
         LuceneUtil.sortDocsForRetrieval(docs);
 
         RecordReader reader = null;
         String lastStorageFilename = null;
-        long lastByteOffset = 0L;
         final Set<ProvenanceEventRecord> matchingRecords = new 
LinkedHashSet<>();
 
+        final long start = System.nanoTime();
+        int logFileCount = 0;
+        
+        final Set<String> storageFilesToSkip = new HashSet<>();
+        
         try {
             for (final Document d : docs) {
                 final String storageFilename = 
d.getField(FieldNames.STORAGE_FILENAME).stringValue();
-                final long byteOffset = 
d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
-
+                if ( storageFilesToSkip.contains(storageFilename) ) {
+                       continue;
+                }
+                
                 try {
-                    if (reader != null && 
storageFilename.equals(lastStorageFilename) && byteOffset > lastByteOffset) {
-                        // Still the same file and the offset is downstream.
-                        try {
-                            reader.skipTo(byteOffset);
-                            final StandardProvenanceEventRecord record = 
reader.nextRecord();
-                            matchingRecords.add(record);
-                        } catch (final IOException e) {
-                            throw new FileNotFoundException("Could not find 
Provenance Log File with basename " + storageFilename + " in the Provenance 
Repository");
-                        }
-
+                    if (reader != null && 
storageFilename.equals(lastStorageFilename)) {
+                               matchingRecords.add(getRecord(d, reader));
                     } else {
+                       logger.debug("Opening log file {}", storageFilename);
+                       
+                       logFileCount++;
                         if (reader != null) {
                             reader.close();
                         }
 
                         List<File> potentialFiles = 
LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
-                            throw new FileNotFoundException("Could not find 
Provenance Log File with basename " + storageFilename + " in the Provenance 
Repository");
+                            logger.warn("Could not find Provenance Log File 
with basename {} in the "
+                                       + "Provenance Repository; assuming file 
has expired and continuing without it", storageFilename);
+                            storageFilesToSkip.add(storageFilename);
+                            continue;
                         }
 
                         if (potentialFiles.size() > 1) {
-                            throw new FileNotFoundException("Found multiple 
Provenance Log Files with basename " + storageFilename + " in the Provenance 
Repository");
+                            throw new FileNotFoundException("Found multiple 
Provenance Log Files with basename " + 
+                                       storageFilename + " in the Provenance 
Repository");
                         }
 
                         for (final File file : potentialFiles) {
-                            reader = RecordReaders.newRecordReader(file, 
allProvenanceLogFiles);
-
                             try {
-                                reader.skip(byteOffset);
-
-                                final StandardProvenanceEventRecord record = 
reader.nextRecord();
-                                matchingRecords.add(record);
+                               reader = RecordReaders.newRecordReader(file, 
allProvenanceLogFiles);
+                                       matchingRecords.add(getRecord(d, 
reader));
                             } catch (final IOException e) {
-                                throw new IOException("Failed to retrieve 
record from Provenance File " + file + " due to " + e, e);
+                                throw new IOException("Failed to retrieve 
record " + d + " from Provenance File " + file + " due to " + e, e);
                             }
                         }
                     }
                 } finally {
                     lastStorageFilename = storageFilename;
-                    lastByteOffset = byteOffset;
                 }
             }
         } finally {
@@ -125,6 +172,9 @@ public class DocsReader {
             }
         }
 
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+        logger.debug("Took {} ms to read {} events from {} prov log files", 
millis, matchingRecords.size(), logFileCount);
+
         return matchingRecords;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
index 6afc193..90a73f4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
@@ -20,4 +20,5 @@ public class FieldNames {
 
     public static final String STORAGE_FILENAME = "storage-filename";
     public static final String STORAGE_FILE_OFFSET = "storage-fileOffset";
+    public static final String BLOCK_INDEX = "block-index";
 }

Reply via email to