NIFI-527: Code cleanup

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3cd18b0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3cd18b0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3cd18b0b

Branch: refs/heads/NIFI-292
Commit: 3cd18b0babc5133e35a2771bc0d0acaf974c381f
Parents: 666de3d
Author: Mark Payne <[email protected]>
Authored: Mon Apr 27 14:13:55 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Apr 27 14:13:55 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/IndexConfiguration.java     |  12 +-
 .../PersistentProvenanceRepository.java         | 612 +++++++-------
 .../provenance/RepositoryConfiguration.java     | 106 +--
 .../nifi/provenance/StandardRecordReader.java   | 246 +++---
 .../nifi/provenance/StandardRecordWriter.java   | 138 ++--
 .../provenance/expiration/ExpirationAction.java |   6 +-
 .../provenance/lucene/DeleteIndexAction.java    |  12 +-
 .../nifi/provenance/lucene/DocsReader.java      |  79 +-
 .../nifi/provenance/lucene/IndexManager.java    | 820 +++++++++----------
 .../nifi/provenance/lucene/IndexSearch.java     |  38 +-
 .../nifi/provenance/lucene/IndexingAction.java  | 119 +--
 .../nifi/provenance/lucene/LineageQuery.java    |   6 +-
 .../nifi/provenance/lucene/LuceneUtil.java      |  38 +-
 .../provenance/rollover/CompressionAction.java  |  59 --
 .../provenance/rollover/RolloverAction.java     |  35 -
 .../provenance/serialization/RecordReader.java  |  57 +-
 .../provenance/serialization/RecordReaders.java | 136 +--
 .../provenance/serialization/RecordWriter.java  |  23 +-
 .../provenance/serialization/RecordWriters.java |   8 +-
 .../nifi/provenance/toc/StandardTocReader.java  |  44 +-
 .../nifi/provenance/toc/StandardTocWriter.java  |  35 +-
 .../apache/nifi/provenance/toc/TocReader.java   |  20 +-
 .../org/apache/nifi/provenance/toc/TocUtil.java |  27 +-
 .../apache/nifi/provenance/toc/TocWriter.java   |  16 +-
 .../TestPersistentProvenanceRepository.java     | 118 +--
 .../TestStandardRecordReaderWriter.java         | 162 ++--
 .../org/apache/nifi/provenance/TestUtil.java    |   2 +-
 .../provenance/toc/TestStandardTocReader.java   |  20 +-
 .../provenance/toc/TestStandardTocWriter.java   |   4 +-
 29 files changed, 1391 insertions(+), 1607 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index a5474d5..3beab65 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -92,7 +92,7 @@ public class IndexConfiguration {
             }
             return firstRecord.getEventTime();
         } catch (final FileNotFoundException | EOFException fnf) {
-            return null;       // file no longer exists or there's no record 
in this file
+            return null; // file no longer exists or there's no record in this 
file
         } catch (final IOException ioe) {
             logger.warn("Failed to read first entry in file {} due to {}", 
provenanceLogFile, ioe.toString());
             logger.warn("", ioe);
@@ -201,7 +201,8 @@ public class IndexConfiguration {
      * desired
      * @param endTime the end time of the query for which the indices are
      * desired
-     * @return
+     * @return the index directories that are applicable only for the given 
time
+     * span (times inclusive).
      */
     public List<File> getIndexDirectories(final Long startTime, final Long 
endTime) {
         if (startTime == null && endTime == null) {
@@ -252,7 +253,8 @@ public class IndexConfiguration {
      *
      * @param provenanceLogFile the provenance log file for which the index
      * directories are desired
-     * @return
+     * @return the index directories that are applicable only for the given
+     * event log
      */
     public List<File> getIndexDirectories(final File provenanceLogFile) {
         final List<File> dirs = new ArrayList<>();
@@ -334,9 +336,7 @@ public class IndexConfiguration {
     }
 
     /**
-     * Returns the amount of disk space in bytes used by all of the indices
-     *
-     * @return
+     * @return the amount of disk space in bytes used by all of the indices
      */
     public long getIndexSize() {
         lock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 48cc164..fe89a5e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -139,7 +139,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
     private final List<ExpirationAction> expirationActions = new ArrayList<>();
 
-    private final IndexingAction indexingAction;
     private final ConcurrentMap<String, AsyncQuerySubmission> 
querySubmissionMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, AsyncLineageSubmission> 
lineageSubmissionMap = new ConcurrentHashMap<>();
 
@@ -151,7 +150,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     private final AtomicBoolean initialized = new AtomicBoolean(false);
 
     private final AtomicBoolean repoDirty = new AtomicBoolean(false);
-    // we keep the last 1000 records on hand so that when the UI is opened and 
it asks for the last 1000 records we don't need to 
+    // we keep the last 1000 records on hand so that when the UI is opened and 
it asks for the last 1000 records we don't need to
     // read them. Since this is a very cheap operation to keep them, it's 
worth the tiny expense for the improved user experience.
     private final RingBuffer<ProvenanceEventRecord> latestRecords = new 
RingBuffer<>(1000);
     private EventReporter eventReporter;
@@ -184,13 +183,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         this.indexManager = new IndexManager();
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
-        
-        final List<SearchableField> fields = 
configuration.getSearchableFields();
-        if (fields != null && !fields.isEmpty()) {
-            indexingAction = new IndexingAction(this, indexConfig);
-        } else {
-            indexingAction = null;
-        }
 
         scheduledExecService = Executors.newScheduledThreadPool(3, new 
NamedThreadFactory("Provenance Maintenance Thread"));
         queryExecService = 
Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new 
NamedThreadFactory("Provenance Query Thread"));
@@ -205,69 +197,69 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
     @Override
     public void initialize(final EventReporter eventReporter) throws 
IOException {
-       writeLock.lock();
-       try {
-               if (initialized.getAndSet(true)) {
-                   return;
-               }
-       
-               this.eventReporter = eventReporter;
-       
-               recover();
-       
-               if (configuration.isAllowRollover()) {
-                   writers = createWriters(configuration, idGenerator.get());
-               }
-       
-               if (configuration.isAllowRollover()) {
-                   scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-                       @Override
-                       public void run() {
-                           // Check if we need to roll over
-                           if (needToRollover()) {
-                               // it appears that we do need to roll over. 
Obtain write lock so that we can do so, and then
-                               // confirm that we still need to.
-                               writeLock.lock();
-                               try {
-                                   logger.debug("Obtained write lock to 
perform periodic rollover");
-       
-                                   if (needToRollover()) {
-                                       try {
-                                           rollover(false);
-                                       } catch (final Exception e) {
-                                           logger.error("Failed to roll over 
Provenance Event Log due to {}", e.toString());
-                                           logger.error("", e);
-                                       }
-                                   }
-                               } finally {
-                                   writeLock.unlock();
-                               }
-                           }
-                       }
-                   }, rolloverCheckMillis, rolloverCheckMillis, 
TimeUnit.MILLISECONDS);
-       
-                   scheduledExecService.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
-                   scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-                       @Override
-                       public void run() {
-                           try {
-                               purgeOldEvents();
-                           } catch (final Exception e) {
-                               logger.error("Failed to purge old events from 
Provenance Repo due to {}", e.toString());
-                               if (logger.isDebugEnabled()) {
-                                   logger.error("", e);
-                               }
-                               eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + 
e.toString());
-                           }
-                       }
-                   }, 1L, 1L, TimeUnit.MINUTES);
-       
-                   expirationActions.add(new DeleteIndexAction(this, 
indexConfig, indexManager));
-                   expirationActions.add(new FileRemovalAction());
-               }
-       } finally {
-               writeLock.unlock();
-       }
+        writeLock.lock();
+        try {
+            if (initialized.getAndSet(true)) {
+                return;
+            }
+
+            this.eventReporter = eventReporter;
+
+            recover();
+
+            if (configuration.isAllowRollover()) {
+                writers = createWriters(configuration, idGenerator.get());
+            }
+
+            if (configuration.isAllowRollover()) {
+                scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        // Check if we need to roll over
+                        if (needToRollover()) {
+                            // it appears that we do need to roll over. Obtain 
write lock so that we can do so, and then
+                            // confirm that we still need to.
+                            writeLock.lock();
+                            try {
+                                logger.debug("Obtained write lock to perform 
periodic rollover");
+
+                                if (needToRollover()) {
+                                    try {
+                                        rollover(false);
+                                    } catch (final Exception e) {
+                                        logger.error("Failed to roll over 
Provenance Event Log due to {}", e.toString());
+                                        logger.error("", e);
+                                    }
+                                }
+                            } finally {
+                                writeLock.unlock();
+                            }
+                        }
+                    }
+                }, rolloverCheckMillis, rolloverCheckMillis, 
TimeUnit.MILLISECONDS);
+
+                scheduledExecService.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
+                scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            purgeOldEvents();
+                        } catch (final Exception e) {
+                            logger.error("Failed to purge old events from 
Provenance Repo due to {}", e.toString());
+                            if (logger.isDebugEnabled()) {
+                                logger.error("", e);
+                            }
+                            eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + 
e.toString());
+                        }
+                    }
+                }, 1L, 1L, TimeUnit.MINUTES);
+
+                expirationActions.add(new DeleteIndexAction(this, indexConfig, 
indexManager));
+                expirationActions.add(new FileRemovalAction());
+            }
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     private static RepositoryConfiguration createRepositoryConfiguration() 
throws IOException {
@@ -489,28 +481,26 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 maxIdFile = file;
             }
 
-            if (firstId > maxIndexedId && indexingAction != null && 
indexingAction.hasBeenPerformed(file)) {
+            if (firstId > maxIndexedId) {
                 maxIndexedId = firstId - 1;
             }
 
-            if (firstId < minIndexedId && indexingAction != null && 
indexingAction.hasBeenPerformed(file)) {
+            if (firstId < minIndexedId) {
                 minIndexedId = firstId;
             }
         }
 
         if (maxIdFile != null) {
-            final boolean lastFileIndexed = indexingAction == null ? false : 
indexingAction.hasBeenPerformed(maxIdFile);
-
             // Determine the max ID in the last file.
             try (final RecordReader reader = 
RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
-               final long eventId = reader.getMaxEventId();
+                final long eventId = reader.getMaxEventId();
                 if (eventId > maxId) {
                     maxId = eventId;
                 }
 
                 // If the ID is greater than the max indexed id and this file 
was indexed, then
                 // update the max indexed id
-                if (eventId > maxIndexedId && lastFileIndexed) {
+                if (eventId > maxIndexedId) {
                     maxIndexedId = eventId;
                 }
             } catch (final IOException ioe) {
@@ -567,7 +557,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             // Read the records in the last file to find its max id
             if (greatestMinIdFile != null) {
                 try (final RecordReader recordReader = 
RecordReaders.newRecordReader(greatestMinIdFile, 
Collections.<Path>emptyList())) {
-                       maxId = recordReader.getMaxEventId();
+                    maxId = recordReader.getMaxEventId();
                 }
             }
 
@@ -604,11 +594,11 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             queryExecService.shutdownNow();
 
             indexManager.close();
-            
+
             if ( writers != null ) {
-                   for (final RecordWriter writer : writers) {
-                       writer.close();
-                   }
+                for (final RecordWriter writer : writers) {
+                    writer.close();
+                }
             }
         } finally {
             writeLock.unlock();
@@ -624,7 +614,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         readLock.lock();
         try {
             if (repoDirty.get()) {
-                logger.debug("Cannot persist provenance record because there 
was an IOException last time a record persistence was attempted. Will not 
attempt to persist more records until the repo has been rolled over.");
+                logger.debug("Cannot persist provenance record because there 
was an IOException last time a record persistence was attempted. "
+                        + "Will not attempt to persist more records until the 
repo has been rolled over.");
                 return;
             }
 
@@ -670,7 +661,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             } catch (final IOException ioe) {
                 logger.error("Failed to persist Provenance Event due to {}. 
Will not attempt to write to the Provenance Repository again until the 
repository has rolled over.", ioe.toString());
                 logger.error("", ioe);
-                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to persist Provenance Event due to " + ioe.toString() + ". Will not 
attempt to write to the Provenance Repository again until the repository has 
rolled over");
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to persist Provenance Event due to " + ioe.toString() +
+                        ". Will not attempt to write to the Provenance 
Repository again until the repository has rolled over");
 
                 // Switch from readLock to writeLock so that we can perform 
rollover
                 readLock.unlock();
@@ -735,9 +727,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     /**
      * Returns the size, in bytes, of the Repository storage
      *
-     * @param logFiles
-     * @param timeCutoff
-     * @return
+     * @param logFiles the log files to consider
+     * @param timeCutoff if a log file's last modified date is before 
timeCutoff, it will be skipped
+     * @return the size of all log files given whose last mod date comes after 
(or equal to) timeCutoff
      */
     public long getSize(final List<File> logFiles, final long timeCutoff) {
         long bytesUsed = 0L;
@@ -760,7 +752,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     /**
      * Purges old events from the repository
      *
-     * @throws IOException
+     * @throws IOException if unable to purge old events due to an I/O problem
      */
     void purgeOldEvents() throws IOException {
         while (!recoveryFinished.get()) {
@@ -858,12 +850,16 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
                 removed.add(baseName);
             } catch (final FileNotFoundException fnf) {
-                logger.warn("Failed to perform Expiration Action {} on 
Provenance Event file {} because the file no longer exists; will not perform 
additional Expiration Actions on this file", currentAction, file);
+                logger.warn("Failed to perform Expiration Action {} on 
Provenance Event file {} because the file no longer exists; will not "
+                        + "perform additional Expiration Actions on this 
file", currentAction, file);
                 removed.add(baseName);
             } catch (final Throwable t) {
-                logger.warn("Failed to perform Expiration Action {} on 
Provenance Event file {} due to {}; will not perform additional Expiration 
Actions on this file at this time", currentAction, file, t.toString());
+                logger.warn("Failed to perform Expiration Action {} on 
Provenance Event file {} due to {}; will not perform additional "
+                        + "Expiration Actions on this file at this time", 
currentAction, file, t.toString());
                 logger.warn("", t);
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to perform Expiration Action " + currentAction + " on Provenance Event 
file " + file + " due to " + t.toString() + "; will not perform additional 
Expiration Actions on this file at this time");
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to perform Expiration Action " + currentAction +
+                        " on Provenance Event file " + file + " due to " + 
t.toString() + "; will not perform additional Expiration Actions " +
+                        "on this file at this time");
             }
         }
 
@@ -906,24 +902,24 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
     // made protected for testing purposes
     protected int getJournalCount() {
-       // determine how many 'journals' we have in the journals directories
+        // determine how many 'journals' we have in the journals directories
         int journalFileCount = 0;
         for ( final File storageDir : configuration.getStorageDirectories() ) {
-               final File journalsDir = new File(storageDir, "journals");
-               final File[] journalFiles = journalsDir.listFiles();
-               if ( journalFiles != null ) {
-                       journalFileCount += journalFiles.length;
-               }
+            final File journalsDir = new File(storageDir, "journals");
+            final File[] journalFiles = journalsDir.listFiles();
+            if ( journalFiles != null ) {
+                journalFileCount += journalFiles.length;
+            }
         }
-        
+
         return journalFileCount;
     }
-    
+
     /**
      * MUST be called with the write lock held
      *
-     * @param force
-     * @throws IOException
+     * @param force if true, will force a rollover regardless of whether or 
not data has been written
+     * @throws IOException if unable to complete rollover
      */
     private void rollover(final boolean force) throws IOException {
         if (!configuration.isAllowRollover()) {
@@ -938,44 +934,44 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 final File writerFile = writer.getFile();
                 journalsToMerge.add(writerFile);
                 try {
-                       writer.close();
+                    writer.close();
                 } catch (final IOException ioe) {
-                       logger.warn("Failed to close {} due to {}", writer, 
ioe.toString());
-                       if ( logger.isDebugEnabled() ) {
-                               logger.warn("", ioe);
-                       }
+                    logger.warn("Failed to close {} due to {}", writer, 
ioe.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
                 }
             }
             if ( logger.isDebugEnabled() ) {
-               logger.debug("Going to merge {} files for journals starting 
with ID {}", journalsToMerge.size(), 
LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+                logger.debug("Going to merge {} files for journals starting 
with ID {}", journalsToMerge.size(), 
LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
             int journalFileCount = getJournalCount();
             final int journalCountThreshold = configuration.getJournalCount() 
* 5;
             if ( journalFileCount > journalCountThreshold ) {
-               logger.warn("The rate of the dataflow is exceeding the 
provenance recording rate. "
-                               + "Slowing down flow to accomodate. Currently, 
there are {} journal files and "
-                               + "threshold for blocking is {}", 
journalFileCount, journalCountThreshold);
-               eventReporter.reportEvent(Severity.WARNING, "Provenance 
Repository", "The rate of the dataflow is "
-                               + "exceeding the provenance recording 
rate. Slowing down flow to accomodate");
-               
-               while (journalFileCount > journalCountThreshold) {
-                       try {
-                               Thread.sleep(1000L);
-                       } catch (final InterruptedException ie) {
-                       }
-                       
-                       logger.debug("Provenance Repository is still behind. 
Keeping flow slowed down "
-                                       + "to accomodate. Currently, there are 
{} journal files and "
-                                       + "threshold for blocking is {}", 
journalFileCount, journalCountThreshold);
-
-                       journalFileCount = getJournalCount();
-               }
-               
-               logger.info("Provenance Repository has no caught up with 
rolling over journal files. Current number of "
-                               + "journal files to be rolled over is {}", 
journalFileCount);
-            }
-            
+                logger.warn("The rate of the dataflow is exceeding the 
provenance recording rate. "
+                        + "Slowing down flow to accomodate. Currently, there 
are {} journal files and "
+                        + "threshold for blocking is {}", journalFileCount, 
journalCountThreshold);
+                eventReporter.reportEvent(Severity.WARNING, "Provenance 
Repository", "The rate of the dataflow is "
+                        + "exceeding the provenance recording rate. Slowing 
down flow to accomodate");
+
+                while (journalFileCount > journalCountThreshold) {
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {
+                    }
+
+                    logger.debug("Provenance Repository is still behind. 
Keeping flow slowed down "
+                            + "to accomodate. Currently, there are {} journal 
files and "
+                            + "threshold for blocking is {}", 
journalFileCount, journalCountThreshold);
+
+                    journalFileCount = getJournalCount();
+                }
+
+                logger.info("Provenance Repository has no caught up with 
rolling over journal files. Current number of "
+                        + "journal files to be rolled over is {}", 
journalFileCount);
+            }
+
             writers = createWriters(configuration, idGenerator.get());
             streamStartTime.set(System.currentTimeMillis());
             recordsWrittenSinceRollover.getAndSet(0);
@@ -989,24 +985,24 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             final Runnable rolloverRunnable = new Runnable() {
                 @Override
                 public void run() {
-                       try {
-                           final File fileRolledOver;
-       
-                           try {
-                               fileRolledOver = mergeJournals(journalsToMerge, 
storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, 
latestRecords);
-                               repoDirty.set(false);
-                           } catch (final IOException ioe) {
-                               repoDirty.set(true);
-                               logger.error("Failed to merge Journal Files {} 
into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
-                               logger.error("", ioe);
-                               return;
-                           }
-       
-                           if (fileRolledOver == null) {
-                               return;
-                           }
-                           File file = fileRolledOver;
-       
+                    try {
+                        final File fileRolledOver;
+
+                        try {
+                            fileRolledOver = mergeJournals(journalsToMerge, 
storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, 
latestRecords);
+                            repoDirty.set(false);
+                        } catch (final IOException ioe) {
+                            repoDirty.set(true);
+                            logger.error("Failed to merge Journal Files {} 
into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+                            logger.error("", ioe);
+                            return;
+                        }
+
+                        if (fileRolledOver == null) {
+                            return;
+                        }
+                        File file = fileRolledOver;
+
                         // update our map of id to Path
                         // need lock to update the map, even though it's an 
AtomicReference, AtomicReference allows those doing a
                         // get() to obtain the most up-to-date version but we 
use a writeLock to prevent multiple threads modifying
@@ -1021,24 +1017,24 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         } finally {
                             writeLock.unlock();
                         }
-       
-                           logger.info("Successfully Rolled over Provenance 
Event file containing {} records", recordsWritten);
-                           rolloverCompletions.getAndIncrement();
-                           
-                           // We have finished successfully. Cancel the future 
so that we don't run anymore
-                           Future<?> future;
-                           while ((future = futureReference.get()) == null) {
-                               try {
-                                       Thread.sleep(10L);
-                               } catch (final InterruptedException ie) {
-                               }
-                           }
-                           
-                           future.cancel(false);
-                       } catch (final Throwable t) {
-                               logger.error("Failed to rollover Provenance 
repository due to {}", t.toString());
-                               logger.error("", t);
-                       }
+
+                        logger.info("Successfully Rolled over Provenance Event 
file containing {} records", recordsWritten);
+                        rolloverCompletions.getAndIncrement();
+
+                        // We have finished successfully. Cancel the future so 
that we don't run anymore
+                        Future<?> future;
+                        while ((future = futureReference.get()) == null) {
+                            try {
+                                Thread.sleep(10L);
+                            } catch (final InterruptedException ie) {
+                            }
+                        }
+
+                        future.cancel(false);
+                    } catch (final Throwable t) {
+                        logger.error("Failed to rollover Provenance repository 
due to {}", t.toString());
+                        logger.error("", t);
+                    }
                 }
             };
 
@@ -1074,10 +1070,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
 
             for (final File journalFile : journalFiles) {
-               if ( journalFile.isDirectory() ) {
-                       continue;
-               }
-               
+                if ( journalFile.isDirectory() ) {
+                    continue;
+                }
+
                 final String basename = 
LuceneUtil.substringBefore(journalFile.getName(), ".");
                 List<File> files = journalMap.get(basename);
                 if (files == null) {
@@ -1120,83 +1116,84 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return mergedFile;
     }
 
-    File mergeJournals(final List<File> journalFiles, final File storageDir, 
final File mergedFile, final EventReporter eventReporter, final 
RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
-       logger.debug("Merging {} to {}", journalFiles, mergedFile);
-       if ( this.closed ) {
-               logger.info("Provenance Repository has been closed; will not 
merge journal files to {}", mergedFile);
-               return null;
-       }
-       
+    File mergeJournals(final List<File> journalFiles, final File storageDir, 
final File mergedFile, final EventReporter eventReporter,
+            final RingBuffer<ProvenanceEventRecord> ringBuffer) throws 
IOException {
+        logger.debug("Merging {} to {}", journalFiles, mergedFile);
+        if ( this.closed ) {
+            logger.info("Provenance Repository has been closed; will not merge 
journal files to {}", mergedFile);
+            return null;
+        }
+
         if (journalFiles.isEmpty()) {
             return null;
         }
 
         Collections.sort(journalFiles, new Comparator<File>() {
-                       @Override
-                       public int compare(final File o1, final File o2) {
-                               final String suffix1 = 
LuceneUtil.substringAfterLast(o1.getName(), ".");
-                               final String suffix2 = 
LuceneUtil.substringAfterLast(o2.getName(), ".");
-
-                               try {
-                                       final int journalIndex1 = 
Integer.parseInt(suffix1);
-                                       final int journalIndex2 = 
Integer.parseInt(suffix2);
-                                       return Integer.compare(journalIndex1, 
journalIndex2);
-                               } catch (final NumberFormatException nfe) {
-                                       return 
o1.getName().compareTo(o2.getName());
-                               }
-                       }
+            @Override
+            public int compare(final File o1, final File o2) {
+                final String suffix1 = 
LuceneUtil.substringAfterLast(o1.getName(), ".");
+                final String suffix2 = 
LuceneUtil.substringAfterLast(o2.getName(), ".");
+
+                try {
+                    final int journalIndex1 = Integer.parseInt(suffix1);
+                    final int journalIndex2 = Integer.parseInt(suffix2);
+                    return Integer.compare(journalIndex1, journalIndex2);
+                } catch (final NumberFormatException nfe) {
+                    return o1.getName().compareTo(o2.getName());
+                }
+            }
         });
-        
+
         final String firstJournalFile = journalFiles.get(0).getName();
         final String firstFileSuffix = 
LuceneUtil.substringAfterLast(firstJournalFile, ".");
         final boolean allPartialFiles = firstFileSuffix.equals("0");
-        
+
         // check if we have all of the "partial" files for the journal.
         if (allPartialFiles) {
-               if ( mergedFile.exists() ) {
-                       // we have all "partial" files and there is already a 
merged file. Delete the data from the index
-                       // because the merge file may not be fully merged. We 
will re-merge.
-                       logger.warn("Merged Journal File {} already exists; 
however, all partial journal files also exist "
-                                       + "so assuming that the merge did not 
finish. Repeating procedure in order to ensure consistency.");
-                       
-                       final DeleteIndexAction deleteAction = new 
DeleteIndexAction(this, indexConfig, indexManager);
-                       try {
-                               deleteAction.execute(mergedFile);
-                       } catch (final Exception e) {
-                               logger.warn("Failed to delete records from 
Journal File {} from the index; this could potentially result in duplicates. 
Failure was due to {}", mergedFile, e.toString());
-                               if ( logger.isDebugEnabled() ) {
-                                       logger.warn("", e);
-                               }
-                       }
-
-                       // Since we only store the file's basename, block 
offset, and event ID, and because the newly created file could end up on
-                       // a different Storage Directory than the original, we 
need to ensure that we delete both the partially merged
-                       // file and the TOC file. Otherwise, we could get the 
wrong copy and have issues retrieving events.
-                       if ( !mergedFile.delete() ) {
-                               logger.error("Failed to delete partially 
written Provenance Journal File {}. This may result in events from this journal 
"
-                                               + "file not being able to be 
displayed. This file should be deleted manually.", mergedFile);
-                       }
-                       
-                       final File tocFile = TocUtil.getTocFile(mergedFile);
-                       if ( tocFile.exists() && !tocFile.delete() ) {
-                               logger.error("Failed to delete .toc file {}; 
this may result in not being able to read the Provenance Events from the {} 
Journal File. "
-                                               + "This can be corrected by 
manually deleting the {} file", tocFile, mergedFile, tocFile);
-                       }
-               }
+            if ( mergedFile.exists() ) {
+                // we have all "partial" files and there is already a merged 
file. Delete the data from the index
+                // because the merge file may not be fully merged. We will 
re-merge.
+                logger.warn("Merged Journal File {} already exists; however, 
all partial journal files also exist "
+                        + "so assuming that the merge did not finish. 
Repeating procedure in order to ensure consistency.");
+
+                final DeleteIndexAction deleteAction = new 
DeleteIndexAction(this, indexConfig, indexManager);
+                try {
+                    deleteAction.execute(mergedFile);
+                } catch (final Exception e) {
+                    logger.warn("Failed to delete records from Journal File {} 
from the index; this could potentially result in duplicates. Failure was due to 
{}", mergedFile, e.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", e);
+                    }
+                }
+
+                // Since we only store the file's basename, block offset, and 
event ID, and because the newly created file could end up on
+                // a different Storage Directory than the original, we need to 
ensure that we delete both the partially merged
+                // file and the TOC file. Otherwise, we could get the wrong 
copy and have issues retrieving events.
+                if ( !mergedFile.delete() ) {
+                    logger.error("Failed to delete partially written 
Provenance Journal File {}. This may result in events from this journal "
+                            + "file not being able to be displayed. This file 
should be deleted manually.", mergedFile);
+                }
+
+                final File tocFile = TocUtil.getTocFile(mergedFile);
+                if ( tocFile.exists() && !tocFile.delete() ) {
+                    logger.error("Failed to delete .toc file {}; this may 
result in not being able to read the Provenance Events from the {} Journal 
File. "
+                            + "This can be corrected by manually deleting the 
{} file", tocFile, mergedFile, tocFile);
+                }
+            }
         } else {
-               logger.warn("Cannot merge journal files {} because expected 
first file to end with extension '.0' "
-                               + "but it did not; assuming that the files were 
already merged but only some finished deletion "
-                               + "before restart. Deleting remaining partial 
journal files.", journalFiles);
-               
-               for ( final File file : journalFiles ) {
-                       if ( !file.delete() && file.exists() ) {
-                               logger.warn("Failed to delete unneeded journal 
file {}; this file should be cleaned up manually", file);
-                       }
-               }
-               
-               return null;
-        }
-        
+            logger.warn("Cannot merge journal files {} because expected first 
file to end with extension '.0' "
+                    + "but it did not; assuming that the files were already 
merged but only some finished deletion "
+                    + "before restart. Deleting remaining partial journal 
files.", journalFiles);
+
+            for ( final File file : journalFiles ) {
+                if ( !file.delete() && file.exists() ) {
+                    logger.warn("Failed to delete unneeded journal file {}; 
this file should be cleaned up manually", file);
+                }
+            }
+
+            return null;
+        }
+
         final long startNanos = System.nanoTime();
 
         // Map each journal to a RecordReader
@@ -1241,12 +1238,14 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     record = reader.nextRecord();
                 } catch (final EOFException eof) {
                 } catch (final Exception e) {
-                    logger.warn("Failed to generate Provenance Event Record 
from Journal due to " + e + "; it's possible that the record wasn't completely 
written to the file. This record will be skipped.");
+                    logger.warn("Failed to generate Provenance Event Record 
from Journal due to " + e + "; it's possible that the record wasn't "
+                            + "completely written to the file. This record 
will be skipped.");
                     if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
 
-                    eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + 
e + "; it's possible that hte record wasn't completely written to the file. 
This record will be skipped.");
+                    eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + 
e +
+                            "; it's possible that hte record wasn't completely 
written to the file. This record will be skipped.");
                 }
 
                 if (record == null) {
@@ -1261,47 +1260,47 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             try (final RecordWriter writer = 
RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), 
true)) {
                 writer.writeHeader();
 
-                final IndexingAction indexingAction = new IndexingAction(this, 
indexConfig);
-                
+                final IndexingAction indexingAction = new IndexingAction(this);
+
                 final File indexingDirectory = 
indexConfig.getWritableIndexDirectory(writerFile);
                 final IndexWriter indexWriter = 
indexManager.borrowIndexWriter(indexingDirectory);
                 try {
-                       long maxId = 0L;
-                       
-                       while (!recordToReaderMap.isEmpty()) {
-                           final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-                           final StandardProvenanceEventRecord record = 
entry.getKey();
-                           final RecordReader reader = entry.getValue();
-       
-                           writer.writeRecord(record, record.getEventId());
-                           final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();
-                           
-                           indexingAction.index(record, indexWriter, 
blockIndex);
-                           maxId = record.getEventId();
-                           
-                           ringBuffer.add(record);
-                           records++;
-       
-                           // Remove this entry from the map
-                           recordToReaderMap.remove(record);
-       
-                           // Get the next entry from this reader and add it 
to the map
-                           StandardProvenanceEventRecord nextRecord = null;
-       
-                           try {
-                               nextRecord = reader.nextRecord();
-                           } catch (final EOFException eof) {
-                           }
-       
-                           if (nextRecord != null) {
-                               recordToReaderMap.put(nextRecord, reader);
-                           }
-                       }
-                       
-                       indexWriter.commit();
-                       indexConfig.setMaxIdIndexed(maxId);
+                    long maxId = 0L;
+
+                    while (!recordToReaderMap.isEmpty()) {
+                        final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+                        final StandardProvenanceEventRecord record = 
entry.getKey();
+                        final RecordReader reader = entry.getValue();
+
+                        writer.writeRecord(record, record.getEventId());
+                        final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();
+
+                        indexingAction.index(record, indexWriter, blockIndex);
+                        maxId = record.getEventId();
+
+                        ringBuffer.add(record);
+                        records++;
+
+                        // Remove this entry from the map
+                        recordToReaderMap.remove(record);
+
+                        // Get the next entry from this reader and add it to 
the map
+                        StandardProvenanceEventRecord nextRecord = null;
+
+                        try {
+                            nextRecord = reader.nextRecord();
+                        } catch (final EOFException eof) {
+                        }
+
+                        if (nextRecord != null) {
+                            recordToReaderMap.put(nextRecord, reader);
+                        }
+                    }
+
+                    indexWriter.commit();
+                    indexConfig.setMaxIdIndexed(maxId);
                 } finally {
-                       indexManager.returnIndexWriter(indexingDirectory, 
indexWriter);
+                    indexManager.returnIndexWriter(indexingDirectory, 
indexWriter);
                 }
             }
         } finally {
@@ -1319,7 +1318,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 logger.warn("Failed to remove temporary journal file {}; this 
file should be cleaned up manually", journalFile.getAbsolutePath());
                 eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; 
this file should be cleaned up manually");
             }
-            
+
             final File tocFile = TocUtil.getTocFile(journalFile);
             if (!tocFile.delete() && tocFile.exists()) {
                 logger.warn("Failed to remove temporary journal TOC file {}; 
this file should be cleaned up manually", tocFile.getAbsolutePath());
@@ -1374,7 +1373,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     public QuerySubmission submitQuery(final Query query) {
         final int numQueries = querySubmissionMap.size();
         if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
-            throw new IllegalStateException("Cannot process query because 
there are currently " + numQueries + " queries whose results have not been 
deleted due to poorly behaving clients not issuing DELETE requests. Please try 
again later.");
+            throw new IllegalStateException("Cannot process query because 
there are currently " + numQueries + " queries whose results have not "
+                    + "been deleted due to poorly behaving clients not issuing 
DELETE requests. Please try again later.");
         }
 
         if (query.getEndDate() != null && query.getStartDate() != null && 
query.getStartDate().getTime() > query.getEndDate().getTime()) {
@@ -1416,7 +1416,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         final AtomicInteger retrievalCount = new AtomicInteger(0);
         final List<File> indexDirectories = indexConfig.getIndexDirectories(
                 query.getStartDate() == null ? null : 
query.getStartDate().getTime(),
-                query.getEndDate() == null ? null : 
query.getEndDate().getTime());
+                        query.getEndDate() == null ? null : 
query.getEndDate().getTime());
         final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 
indexDirectories.size());
         querySubmissionMap.put(query.getIdentifier(), result);
 
@@ -1432,11 +1432,11 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     }
 
     /**
-     * REMOVE-ME: This is for testing only and can be removed.
+     * This is for testing only and not actually used other than in debugging
      *
-     * @param luceneQuery
-     * @return
-     * @throws IOException
+     * @param luceneQuery the lucene query to execute
+     * @return an Iterator of ProvenanceEventRecord that match the query
+     * @throws IOException if unable to perform the query
      */
     public Iterator<ProvenanceEventRecord> queryLucene(final 
org.apache.lucene.search.Query luceneQuery) throws IOException {
         final List<File> indexFiles = indexConfig.getIndexDirectories();
@@ -1601,7 +1601,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return computeLineage(Collections.<String>singleton(flowFileUuid), 
LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
     }
 
-    private Lineage computeLineage(final Collection<String> flowFileUuids, 
final LineageComputationType computationType, final Long eventId, final Long 
startTimestamp, final Long endTimestamp) throws IOException {
+    private Lineage computeLineage(final Collection<String> flowFileUuids, 
final LineageComputationType computationType, final Long eventId, final Long 
startTimestamp,
+            final Long endTimestamp) throws IOException {
         final AsyncLineageSubmission submission = 
submitLineageComputation(flowFileUuids, computationType, eventId, 
startTimestamp, endTimestamp);
         final StandardLineageResult result = submission.getResult();
         while (!result.isFinished()) {
@@ -1623,7 +1624,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return submitLineageComputation(Collections.singleton(flowFileUuid), 
LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
     }
 
-    private AsyncLineageSubmission submitLineageComputation(final 
Collection<String> flowFileUuids, final LineageComputationType computationType, 
final Long eventId, final long startTimestamp, final long endTimestamp) {
+    private AsyncLineageSubmission submitLineageComputation(final 
Collection<String> flowFileUuids, final LineageComputationType computationType,
+            final Long eventId, final long startTimestamp, final long 
endTimestamp) {
         final List<File> indexDirs = 
indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
         final AsyncLineageSubmission result = new 
AsyncLineageSubmission(computationType, eventId, flowFileUuids, 
indexDirs.size());
         lineageSubmissionMap.put(result.getLineageIdentifier(), result);
@@ -1647,16 +1649,16 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
 
             switch (event.getEventType()) {
-                case CLONE:
-                case FORK:
-                case JOIN:
-                case REPLAY:
-                    return submitLineageComputation(event.getChildUuids(), 
LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), 
Long.MAX_VALUE);
-                default:
-                    final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1);
-                    
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                    submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its children cannot 
be expanded");
-                    return submission;
+            case CLONE:
+            case FORK:
+            case JOIN:
+            case REPLAY:
+                return submitLineageComputation(event.getChildUuids(), 
LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), 
Long.MAX_VALUE);
+            default:
+                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
+                submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its children cannot 
be expanded");
+                return submission;
             }
         } catch (final IOException ioe) {
             final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1);
@@ -1684,17 +1686,17 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
 
             switch (event.getEventType()) {
-                case JOIN:
-                case FORK:
-                case CLONE:
-                case REPLAY:
-                    return submitLineageComputation(event.getParentUuids(), 
LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
-                default: {
-                    final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String>emptyList(), 1);
-                    
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                    submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its parents cannot 
be expanded");
-                    return submission;
-                }
+            case JOIN:
+            case FORK:
+            case CLONE:
+            case REPLAY:
+                return submitLineageComputation(event.getParentUuids(), 
LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
+            default: {
+                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String>emptyList(), 1);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
+                submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its parents cannot 
be expanded");
+                return submission;
+            }
             }
         } catch (final IOException ioe) {
             final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String>emptyList(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 3951591..d0d147c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -34,7 +34,7 @@ public class RepositoryConfiguration {
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int journalCount = 16;
     private int compressionBlockBytes = 1024 * 1024;
-    
+
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
@@ -50,19 +50,19 @@ public class RepositoryConfiguration {
         return allowRollover;
     }
 
-    
+
     public int getCompressionBlockBytes() {
-               return compressionBlockBytes;
-       }
+        return compressionBlockBytes;
+    }
 
-       public void setCompressionBlockBytes(int compressionBlockBytes) {
-               this.compressionBlockBytes = compressionBlockBytes;
-       }
+    public void setCompressionBlockBytes(int compressionBlockBytes) {
+        this.compressionBlockBytes = compressionBlockBytes;
+    }
 
-       /**
+    /**
      * Specifies where the repository will store data
      *
-     * @return
+     * @return the directories where provenance files will be stored
      */
     public List<File> getStorageDirectories() {
         return Collections.unmodifiableList(storageDirectories);
@@ -71,18 +71,15 @@ public class RepositoryConfiguration {
     /**
      * Specifies where the repository should store data
      *
-     * @param storageDirectory
+     * @param storageDirectory the directory to store provenance files
      */
     public void addStorageDirectory(final File storageDirectory) {
         this.storageDirectories.add(storageDirectory);
     }
 
     /**
-     * Returns the minimum amount of time that a given record will stay in the
-     * repository
-     *
-     * @param timeUnit
-     * @return
+     * @param timeUnit the desired time unit
+     * @return the max amount of time that a given record will stay in the 
repository
      */
     public long getMaxRecordLife(final TimeUnit timeUnit) {
         return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS);
@@ -91,8 +88,8 @@ public class RepositoryConfiguration {
     /**
      * Specifies how long a record should stay in the repository
      *
-     * @param maxRecordLife
-     * @param timeUnit
+     * @param maxRecordLife the max amount of time to keep a record in the repo
+     * @param timeUnit the period of time used by maxRecordLife
      */
     public void setMaxRecordLife(final long maxRecordLife, final TimeUnit 
timeUnit) {
         this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, 
timeUnit);
@@ -101,7 +98,7 @@ public class RepositoryConfiguration {
     /**
      * Returns the maximum amount of data to store in the repository (in bytes)
      *
-     * @return
+     * @return the maximum amount of disk space to use for the prov repo
      */
     public long getMaxStorageCapacity() {
         return storageCapacity;
@@ -109,107 +106,91 @@ public class RepositoryConfiguration {
 
     /**
      * Sets the maximum amount of data to store in the repository (in bytes)
-     * @param maxStorageCapacity
+     *
+     * @param maxStorageCapacity the maximum amount of disk space to use for 
the prov repo
      */
     public void setMaxStorageCapacity(final long maxStorageCapacity) {
         this.storageCapacity = maxStorageCapacity;
     }
 
     /**
-     * Returns the maximum amount of time to write to a single event file
-     *
-     * @param timeUnit
-     * @return
+     * @param timeUnit the desired time unit for the returned value
+     * @return the maximum amount of time that the repo will write to a single 
event file
      */
     public long getMaxEventFileLife(final TimeUnit timeUnit) {
         return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS);
     }
 
     /**
-     * Sets the maximum amount of time to write to a single event file
-     *
-     * @param maxEventFileTime
-     * @param timeUnit
+     * @param maxEventFileTime the max amount of time to write to a single 
event file
+     * @param timeUnit the units for the value supplied by maxEventFileTime
      */
     public void setMaxEventFileLife(final long maxEventFileTime, final 
TimeUnit timeUnit) {
         this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, 
timeUnit);
     }
 
     /**
-     * Returns the maximum number of bytes (pre-compression) that will be
+     * @return the maximum number of bytes (pre-compression) that will be
      * written to a single event file before the file is rolled over
-     *
-     * @return
      */
     public long getMaxEventFileCapacity() {
         return eventFileBytes;
     }
 
     /**
-     * Sets the maximum number of bytes (pre-compression) that will be written
+     * @param maxEventFileBytes the maximum number of bytes (pre-compression) 
that will be written
      * to a single event file before the file is rolled over
-     *
-     * @param maxEventFileBytes
      */
     public void setMaxEventFileCapacity(final long maxEventFileBytes) {
         this.eventFileBytes = maxEventFileBytes;
     }
 
     /**
-     * Returns the fields that can be indexed
-     *
-     * @return
+     * @return the fields that should be indexed
      */
     public List<SearchableField> getSearchableFields() {
         return Collections.unmodifiableList(searchableFields);
     }
 
     /**
-     * Sets the fields to index
-     *
-     * @param searchableFields
+     * @param searchableFields the fields to index
      */
     public void setSearchableFields(final List<SearchableField> 
searchableFields) {
         this.searchableFields = new ArrayList<>(searchableFields);
     }
 
     /**
-     * Returns the FlowFile attributes that can be indexed
-     *
-     * @return
+     * @return the FlowFile attributes that should be indexed
      */
     public List<SearchableField> getSearchableAttributes() {
         return Collections.unmodifiableList(searchableAttributes);
     }
 
     /**
-     * Sets the FlowFile attributes to index
-     *
-     * @param searchableAttributes
+     * @param searchableAttributes the FlowFile attributes to index
      */
     public void setSearchableAttributes(final List<SearchableField> 
searchableAttributes) {
         this.searchableAttributes = new ArrayList<>(searchableAttributes);
     }
 
     /**
-     * Indicates whether or not event files will be compressed when they are
+     * @return whether or not event files will be compressed when they are
      * rolled over
-     *
-     * @return
      */
     public boolean isCompressOnRollover() {
         return compress;
     }
 
     /**
-     * Specifies whether or not to compress event files on rollover
-     *
-     * @param compress
+     * @param compress if true, the data will be compressed when rolled over
      */
     public void setCompressOnRollover(final boolean compress) {
         this.compress = compress;
     }
 
+    /**
+     * @return the number of threads to use to query the repo
+     */
     public int getQueryThreadPoolSize() {
         return queryThreadPoolSize;
     }
@@ -246,27 +227,23 @@ public class RepositoryConfiguration {
      * </li>
      * </ol>
      *
-     * @param bytes
+     * @param bytes the number of bytes to write to an index before beginning 
a new shard
      */
     public void setDesiredIndexSize(final long bytes) {
         this.desiredIndexBytes = bytes;
     }
 
     /**
-     * Returns the desired size of each index shard. See the
-     * {@Link #setDesiredIndexSize} method for an explanation of why we choose
+     * @return the desired size of each index shard. See the
+     * {@link #setDesiredIndexSize} method for an explanation of why we choose
      * to shard the index.
-     *
-     * @return
      */
     public long getDesiredIndexSize() {
         return desiredIndexBytes;
     }
 
     /**
-     * Sets the number of Journal files to use when persisting records.
-     *
-     * @param numJournals
+     * @param numJournals the number of Journal files to use when persisting 
records.
      */
     public void setJournalCount(final int numJournals) {
         if (numJournals < 1) {
@@ -277,19 +254,14 @@ public class RepositoryConfiguration {
     }
 
     /**
-     * Returns the number of Journal files that will be used when persisting
-     * records.
-     *
-     * @return
+     * @return the number of Journal files that will be used when persisting 
records.
      */
     public int getJournalCount() {
         return journalCount;
     }
 
     /**
-     * Specifies whether or not the Repository should sync all updates to disk.
-     *
-     * @return
+     * @return <code>true</code> if the repository will perform an 'fsync' for 
all updates to disk
      */
     public boolean isAlwaysSync() {
         return alwaysSync;
@@ -301,7 +273,7 @@ public class RepositoryConfiguration {
      * persisted across restarted, even if there is a power failure or a sudden
      * Operating System crash, but it can be very expensive.
      *
-     * @param alwaysSync
+     * @param alwaysSync whether or not to perform an 'fsync' for all updates 
to disk
      */
     public void setAlwaysSync(boolean alwaysSync) {
         this.alwaysSync = alwaysSync;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 9bbf195..ca0d5ed 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -39,40 +39,40 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordReader implements RecordReader {
-       private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordReader.class);
-       
-       private final ByteCountingInputStream rawInputStream;
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordReader.class);
+
+    private final ByteCountingInputStream rawInputStream;
     private final String filename;
     private final int serializationVersion;
     private final boolean compressed;
     private final TocReader tocReader;
     private final int headerLength;
-    
+
     private DataInputStream dis;
     private ByteCountingInputStream byteCountingIn;
 
     public StandardRecordReader(final InputStream in, final String filename) 
throws IOException {
-       this(in, filename, null);
+        this(in, filename, null);
     }
-    
+
     public StandardRecordReader(final InputStream in, final String filename, 
final TocReader tocReader) throws IOException {
-       logger.trace("Creating RecordReader for {}", filename);
-       
-       rawInputStream = new ByteCountingInputStream(in);
+        logger.trace("Creating RecordReader for {}", filename);
+
+        rawInputStream = new ByteCountingInputStream(in);
 
         final InputStream limitedStream;
         if ( tocReader == null ) {
-               limitedStream = rawInputStream;
+            limitedStream = rawInputStream;
         } else {
-               final long offset1 = tocReader.getBlockOffset(1);
-               if ( offset1 < 0 ) {
-                       limitedStream = rawInputStream;
-               } else {
-                       limitedStream = new LimitingInputStream(rawInputStream, 
offset1 - rawInputStream.getBytesConsumed());
-               }
-        }
-        
-       final InputStream readableStream;
+            final long offset1 = tocReader.getBlockOffset(1);
+            if ( offset1 < 0 ) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, 
offset1 - rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
         if (filename.endsWith(".gz")) {
             readableStream = new BufferedInputStream(new 
GZIPInputStream(limitedStream));
             compressed = true;
@@ -83,11 +83,11 @@ public class StandardRecordReader implements RecordReader {
 
         byteCountingIn = new ByteCountingInputStream(readableStream);
         dis = new DataInputStream(byteCountingIn);
-        
+
         final String repoClassName = dis.readUTF();
         final int serializationVersion = dis.readInt();
-        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 
2 + 4;  // 2 bytes for string length, 4 for integer.
-        
+        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 
2 + 4; // 2 bytes for string length, 4 for integer.
+
         if (serializationVersion < 1 || serializationVersion > 8) {
             throw new IllegalArgumentException("Unable to deserialize record 
because the version is " + serializationVersion + " and supported versions are 
1-8");
         }
@@ -99,52 +99,52 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipToBlock(final int blockIndex) throws IOException {
-       if ( tocReader == null ) {
-               throw new IllegalStateException("Cannot skip to block " + 
blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents 
file was found for this Log");
-       }
-       
-       if ( blockIndex < 0 ) {
-               throw new IllegalArgumentException("Cannot skip to block " + 
blockIndex + " because the value is negative");
-       }
-       
-       if ( blockIndex == getBlockIndex() ) {
-               return;
-       }
-       
-       final long offset = tocReader.getBlockOffset(blockIndex);
-       if ( offset < 0 ) {
-               throw new IOException("Unable to find block " + blockIndex + " 
in Provenance Log " + filename);
-       }
-       
-       final long curOffset = rawInputStream.getBytesConsumed();
-       
-       final long bytesToSkip = offset - curOffset;
-       if ( bytesToSkip >= 0 ) {
-               try {
-                       StreamUtils.skip(rawInputStream, bytesToSkip);
-                       logger.debug("Skipped stream from offset {} to {} ({} 
bytes skipped)", curOffset, offset, bytesToSkip);
-               } catch (final IOException e) {
-                       throw new IOException("Failed to skip to offset " + 
offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
-               }
-       
-               resetStreamForNextBlock();
-       }
+        if ( tocReader == null ) {
+            throw new IllegalStateException("Cannot skip to block " + 
blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents 
file was found for this Log");
+        }
+
+        if ( blockIndex < 0 ) {
+            throw new IllegalArgumentException("Cannot skip to block " + 
blockIndex + " because the value is negative");
+        }
+
+        if ( blockIndex == getBlockIndex() ) {
+            return;
+        }
+
+        final long offset = tocReader.getBlockOffset(blockIndex);
+        if ( offset < 0 ) {
+            throw new IOException("Unable to find block " + blockIndex + " in 
Provenance Log " + filename);
+        }
+
+        final long curOffset = rawInputStream.getBytesConsumed();
+
+        final long bytesToSkip = offset - curOffset;
+        if ( bytesToSkip >= 0 ) {
+            try {
+                StreamUtils.skip(rawInputStream, bytesToSkip);
+                logger.debug("Skipped stream from offset {} to {} ({} bytes 
skipped)", curOffset, offset, bytesToSkip);
+            } catch (final IOException e) {
+                throw new IOException("Failed to skip to offset " + offset + " 
for block " + blockIndex + " of Provenance Log " + filename, e);
+            }
+
+            resetStreamForNextBlock();
+        }
     }
-    
+
     private void resetStreamForNextBlock() throws IOException {
-       final InputStream limitedStream;
+        final InputStream limitedStream;
         if ( tocReader == null ) {
-               limitedStream = rawInputStream;
+            limitedStream = rawInputStream;
         } else {
-               final long offset = tocReader.getBlockOffset(1 + 
getBlockIndex());
-               if ( offset < 0 ) {
-                       limitedStream = rawInputStream;
-               } else {
-                       limitedStream = new LimitingInputStream(rawInputStream, 
offset - rawInputStream.getBytesConsumed());
-               }
-        }
-       
-       final InputStream readableStream;
+            final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+            if ( offset < 0 ) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, offset 
- rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
         if (compressed) {
             readableStream = new BufferedInputStream(new 
GZIPInputStream(limitedStream));
         } else {
@@ -154,32 +154,32 @@ public class StandardRecordReader implements RecordReader 
{
         byteCountingIn = new ByteCountingInputStream(readableStream, 
rawInputStream.getBytesConsumed());
         dis = new DataInputStream(byteCountingIn);
     }
-    
-    
+
+
     @Override
     public TocReader getTocReader() {
-       return tocReader;
+        return tocReader;
     }
-    
+
     @Override
     public boolean isBlockIndexAvailable() {
-       return tocReader != null;
+        return tocReader != null;
     }
-    
+
     @Override
     public int getBlockIndex() {
-       if ( tocReader == null ) {
-               throw new IllegalStateException("Cannot determine Block Index 
because no Table-of-Contents could be found for Provenance Log " + filename);
-       }
-       
-       return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+        if ( tocReader == null ) {
+            throw new IllegalStateException("Cannot determine Block Index 
because no Table-of-Contents could be found for Provenance Log " + filename);
+        }
+
+        return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
     }
-    
+
     @Override
     public long getBytesConsumed() {
-       return byteCountingIn.getBytesConsumed();
+        return byteCountingIn.getBytesConsumed();
     }
-    
+
     private StandardProvenanceEventRecord readPreVersion6Record() throws 
IOException {
         final long startOffset = byteCountingIn.getBytesConsumed();
 
@@ -374,17 +374,17 @@ public class StandardRecordReader implements RecordReader 
{
     }
 
     private String readUUID(final DataInputStream in) throws IOException {
-       if ( serializationVersion < 8 ) {
-               final long msb = in.readLong();
-               final long lsb = in.readLong();
-               return new UUID(msb, lsb).toString();
-       } else {
-               // before version 8, we serialized UUID's as two longs in order 
to
-               // write less data. However, in version 8 we changed to just 
writing
-               // out the string because it's extremely expensive to call 
UUID.fromString.
-               // In the end, since we generally compress, the savings in 
minimal anyway.
-               return in.readUTF();
-       }
+        if ( serializationVersion < 8 ) {
+            final long msb = in.readLong();
+            final long lsb = in.readLong();
+            return new UUID(msb, lsb).toString();
+        } else {
+            // before version 8, we serialized UUID's as two longs in order to
+            // write less data. However, in version 8 we changed to just 
writing
+            // out the string because it's extremely expensive to call 
UUID.fromString.
+            // In the end, since we generally compress, the savings in minimal 
anyway.
+            return in.readUTF();
+        }
     }
 
     private String readNullableString(final DataInputStream in) throws 
IOException {
@@ -416,53 +416,53 @@ public class StandardRecordReader implements RecordReader 
{
         byteCountingIn.mark(1);
         int nextByte = byteCountingIn.read();
         byteCountingIn.reset();
-        
+
         if ( nextByte < 0 ) {
-               try {
-                       resetStreamForNextBlock();
-               } catch (final EOFException eof) {
-                       return false;
-               }
-               
+            try {
+                resetStreamForNextBlock();
+            } catch (final EOFException eof) {
+                return false;
+            }
+
             byteCountingIn.mark(1);
             nextByte = byteCountingIn.read();
             byteCountingIn.reset();
         }
-        
+
         return (nextByte >= 0);
     }
-    
+
     @Override
     public long getMaxEventId() throws IOException {
-       if ( tocReader != null ) {
-               final long lastBlockOffset = tocReader.getLastBlockOffset();
-               skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
-       }
-       
-       ProvenanceEventRecord record;
-       ProvenanceEventRecord lastRecord = null;
-       try {
-               while ((record = nextRecord()) != null) {
-                       lastRecord = record;
-               }
-       } catch (final EOFException eof) {
-               // This can happen if we stop NIFi while the record is being 
written.
-               // This is OK, we just ignore this record. The session will not 
have been
-               // committed, so we can just process the FlowFile again.
-       }
-       
-       return (lastRecord == null) ? -1L : lastRecord.getEventId();
+        if ( tocReader != null ) {
+            final long lastBlockOffset = tocReader.getLastBlockOffset();
+            skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+        }
+
+        ProvenanceEventRecord record;
+        ProvenanceEventRecord lastRecord = null;
+        try {
+            while ((record = nextRecord()) != null) {
+                lastRecord = record;
+            }
+        } catch (final EOFException eof) {
+            // This can happen if we stop NIFi while the record is being 
written.
+            // This is OK, we just ignore this record. The session will not 
have been
+            // committed, so we can just process the FlowFile again.
+        }
+
+        return (lastRecord == null) ? -1L : lastRecord.getEventId();
     }
 
     @Override
     public void close() throws IOException {
-       logger.trace("Closing Record Reader for {}", filename);
-       
+        logger.trace("Closing Record Reader for {}", filename);
+
         dis.close();
         rawInputStream.close();
-        
+
         if ( tocReader != null ) {
-               tocReader.close();
+            tocReader.close();
         }
     }
 
@@ -473,9 +473,9 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipTo(final long position) throws IOException {
-       // we are subtracting headerLength from the number of bytes consumed 
because we used to 
-       // consider the offset of the first record "0" - now we consider it 
whatever position it
-       // it really is in the stream.
+        // we are subtracting headerLength from the number of bytes consumed 
because we used to
+        // consider the offset of the first record "0" - now we consider it 
whatever position it
+        // it really is in the stream.
         final long currentPosition = byteCountingIn.getBytesConsumed() - 
headerLength;
         if (currentPosition == position) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index dbb2c48..3095f13 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -36,15 +36,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordWriter implements RecordWriter {
-       private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordWriter.class);
-       
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordWriter.class);
+
     private final File file;
     private final FileOutputStream fos;
     private final ByteCountingOutputStream rawOutStream;
     private final TocWriter tocWriter;
     private final boolean compressed;
     private final int uncompressedBlockSize;
-    
+
     private DataOutputStream out;
     private ByteCountingOutputStream byteCountingOut;
     private long lastBlockOffset = 0L;
@@ -52,21 +52,21 @@ public class StandardRecordWriter implements RecordWriter {
 
     private final Lock lock = new ReentrantLock();
 
-    
+
     public StandardRecordWriter(final File file, final TocWriter writer, final 
boolean compressed, final int uncompressedBlockSize) throws IOException {
-       logger.trace("Creating Record Writer for {}", file.getName());
-       
+        logger.trace("Creating Record Writer for {}", file.getName());
+
         this.file = file;
         this.compressed = compressed;
         this.fos = new FileOutputStream(file);
         rawOutStream = new ByteCountingOutputStream(fos);
         this.uncompressedBlockSize = uncompressedBlockSize;
-        
+
         this.tocWriter = writer;
     }
 
     static void writeUUID(final DataOutputStream out, final String uuid) 
throws IOException {
-       out.writeUTF(uuid);
+        out.writeUTF(uuid);
     }
 
     static void writeUUIDs(final DataOutputStream out, final 
Collection<String> list) throws IOException {
@@ -85,49 +85,49 @@ public class StandardRecordWriter implements RecordWriter {
         return file;
     }
 
-       @Override
+    @Override
     public synchronized void writeHeader() throws IOException {
         lastBlockOffset = rawOutStream.getBytesWritten();
         resetWriteStream();
-        
+
         out.writeUTF(PersistentProvenanceRepository.class.getName());
         out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
         out.flush();
     }
-    
+
     private void resetWriteStream() throws IOException {
-       if ( out != null ) {
-               out.flush();
-       }
-
-       final long byteOffset = (byteCountingOut == null) ? 
rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
-       
-       final OutputStream writableStream;
-       if ( compressed ) {
-               // because of the way that GZIPOutputStream works, we need to 
call close() on it in order for it
-               // to write its trailing bytes. But we don't want to close the 
underlying OutputStream, so we wrap
-               // the underlying OutputStream in a NonCloseableOutputStream
-               if ( out != null ) {
-                       out.close();
-               }
-
-               if ( tocWriter != null ) {
-                       
tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
-               }
-
-               writableStream = new BufferedOutputStream(new 
GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
-       } else {
-               if ( tocWriter != null ) {
-                       
tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
-               }
-
-               writableStream = new BufferedOutputStream(rawOutStream, 65536);
-       }
-       
+        if ( out != null ) {
+            out.flush();
+        }
+
+        final long byteOffset = (byteCountingOut == null) ? 
rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+
+        final OutputStream writableStream;
+        if ( compressed ) {
+            // because of the way that GZIPOutputStream works, we need to call 
close() on it in order for it
+            // to write its trailing bytes. But we don't want to close the 
underlying OutputStream, so we wrap
+            // the underlying OutputStream in a NonCloseableOutputStream
+            if ( out != null ) {
+                out.close();
+            }
+
+            if ( tocWriter != null ) {
+                tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+            }
+
+            writableStream = new BufferedOutputStream(new GZIPOutputStream(new 
NonCloseableOutputStream(rawOutStream), 1), 65536);
+        } else {
+            if ( tocWriter != null ) {
+                tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+            }
+
+            writableStream = new BufferedOutputStream(rawOutStream, 65536);
+        }
+
         this.byteCountingOut = new ByteCountingOutputStream(writableStream, 
byteOffset);
         this.out = new DataOutputStream(byteCountingOut);
     }
-    
+
 
     @Override
     public synchronized long writeRecord(final ProvenanceEventRecord record, 
long recordIdentifier) throws IOException {
@@ -136,16 +136,16 @@ public class StandardRecordWriter implements RecordWriter 
{
 
         // add a new block to the TOC if needed.
         if ( tocWriter != null && (startBytes - lastBlockOffset >= 
uncompressedBlockSize) ) {
-               lastBlockOffset = startBytes;
-               
-               if ( compressed ) {
-                       // because of the way that GZIPOutputStream works, we 
need to call close() on it in order for it
-                       // to write its trailing bytes. But we don't want to 
close the underlying OutputStream, so we wrap
-                       // the underlying OutputStream in a 
NonCloseableOutputStream
-                       resetWriteStream();
-               }
+            lastBlockOffset = startBytes;
+
+            if ( compressed ) {
+                // because of the way that GZIPOutputStream works, we need to 
call close() on it in order for it
+                // to write its trailing bytes. But we don't want to close the 
underlying OutputStream, so we wrap
+                // the underlying OutputStream in a NonCloseableOutputStream
+                resetWriteStream();
+            }
         }
-        
+
         out.writeLong(recordIdentifier);
         out.writeUTF(record.getEventType().name());
         out.writeLong(record.getEventTime());
@@ -175,7 +175,7 @@ public class StandardRecordWriter implements RecordWriter {
             writeLongNullableString(out, entry.getValue());
         }
 
-        // If Content Claim Info is present, write out a 'TRUE' followed by 
claim info. Else, write out 'false'. 
+        // If Content Claim Info is present, write out a 'TRUE' followed by 
claim info. Else, write out 'false'.
         if (record.getContentClaimSection() != null && 
record.getContentClaimContainer() != null && record.getContentClaimIdentifier() 
!= null) {
             out.writeBoolean(true);
             out.writeUTF(record.getContentClaimContainer());
@@ -261,24 +261,24 @@ public class StandardRecordWriter implements RecordWriter 
{
 
     @Override
     public synchronized void close() throws IOException {
-       logger.trace("Closing Record Writer for {}", file.getName());
-       
+        logger.trace("Closing Record Writer for {}", file.getName());
+
         lock();
         try {
-               try {
-                       out.flush();
-                       out.close();
-               } finally {
-                       rawOutStream.close();
-            
-                   if ( tocWriter != null ) {
-                       tocWriter.close();
-                   }
-               }
+            try {
+                out.flush();
+                out.close();
+            } finally {
+                rawOutStream.close();
+
+                if ( tocWriter != null ) {
+                    tocWriter.close();
+                }
+            }
         } finally {
             unlock();
         }
-        
+
     }
 
     @Override
@@ -308,14 +308,14 @@ public class StandardRecordWriter implements RecordWriter 
{
 
     @Override
     public void sync() throws IOException {
-       if ( tocWriter != null ) {
-               tocWriter.sync();
-       }
-       fos.getFD().sync();
+        if ( tocWriter != null ) {
+            tocWriter.sync();
+        }
+        fos.getFD().sync();
     }
-    
+
     @Override
     public TocWriter getTocWriter() {
-       return tocWriter;
+        return tocWriter;
     }
 }

Reply via email to