NIFI-803: Ensure that if an OutOfMemoryError occurs, the Provenance Repo won't 
become corrupt


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6a0a321b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6a0a321b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6a0a321b

Branch: refs/heads/NIFI-744
Commit: 6a0a321b64568c70e417b78b2c52ab39af1c3d63
Parents: 496ebfb
Author: Mark Payne <[email protected]>
Authored: Mon Aug 3 13:25:53 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Aug 4 08:54:03 2015 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 210 ++++++++++++-------
 .../nifi/provenance/StandardRecordReader.java   |   8 +-
 .../nifi/provenance/StandardRecordWriter.java   |  24 ++-
 .../provenance/serialization/RecordWriter.java  |  12 +-
 .../TestPersistentProvenanceRepository.java     | 120 ++++++++++-
 5 files changed, 294 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6a0a321b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 81d883a..a1063f0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -152,7 +152,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     private final AtomicInteger rolloverCompletions = new AtomicInteger(0);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
 
-    private final AtomicBoolean repoDirty = new AtomicBoolean(false);
+    private final AtomicInteger dirtyWriterCount = new AtomicInteger(0);
+
     // we keep the last 1000 records on hand so that when the UI is opened and 
it asks for the last 1000 records we don't need to
     // read them. Since this is a very cheap operation to keep them, it's 
worth the tiny expense for the improved user experience.
     private final RingBuffer<ProvenanceEventRecord> latestRecords = new 
RingBuffer<>(1000);
@@ -338,7 +339,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return config;
     }
 
-    static RecordWriter[] createWriters(final RepositoryConfiguration config, 
final long initialRecordId) throws IOException {
+    // protected in order to override for unit tests
+    protected RecordWriter[] createWriters(final RepositoryConfiguration 
config, final long initialRecordId) throws IOException {
         final List<File> storageDirectories = config.getStorageDirectories();
 
         final RecordWriter[] writers = new 
RecordWriter[config.getJournalCount()];
@@ -561,13 +563,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
         idGenerator.set(maxId + 1);
 
-        // TODO: Consider refactoring this so that we do the rollover actions 
at the same time that we merge the journals.
-        // This would require a few different changes:
-        //      * Rollover Actions would take a ProvenanceEventRecord at a 
time, not a File at a time. Would have to be either
-        //          constructed or "started" for each file and then closed 
after each file
-        //      * The recovery would have to then read through all of the 
journals with the highest starting ID to determine
-        //          the action max id instead of reading the merged file
-        //      * We would write to a temporary file and then rename once the 
merge is complete. This allows us to fail and restart
         try {
             final Set<File> recoveredJournals = recoverJournalFiles();
             filesToRecover.addAll(recoveredJournals);
@@ -654,13 +649,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         final long totalJournalSize;
         readLock.lock();
         try {
-            if (repoDirty.get()) {
-                logger.debug("Cannot persist provenance record because there 
was an IOException last time a record persistence was attempted. "
-                        + "Will not attempt to persist more records until the 
repo has been rolled over.");
-                return;
-            }
-
-            final RecordWriter[] recordWriters = this.writers;
             long bytesWritten = 0L;
 
             // obtain a lock on one of the RecordWriter's so that no other 
thread is able to write to this writer until we're finished.
@@ -669,6 +657,13 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             boolean locked = false;
             RecordWriter writer;
             do {
+                final RecordWriter[] recordWriters = this.writers;
+                final int numDirty = dirtyWriterCount.get();
+                if (numDirty >= recordWriters.length) {
+                    throw new IllegalStateException("Cannot update repository 
because all partitions are unusable at this time. Writing to the repository 
would cause corruption. "
+                        + "This most often happens as a result of the 
repository running out of disk space or the JMV running out of memory.");
+                }
+
                 final long idx = writerIndex.getAndIncrement();
                 writer = recordWriters[(int) (idx % recordWriters.length)];
                 locked = writer.tryLock();
@@ -688,24 +683,31 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
                     totalJournalSize = 
bytesWrittenSinceRollover.addAndGet(bytesWritten);
                     recordsWrittenSinceRollover.getAndIncrement();
-                } catch (final IOException ioe) {
+                } catch (final Throwable t) {
                     // We need to set the repoDirty flag before we release the 
lock for this journal.
                     // Otherwise, another thread may write to this journal -- 
this is a problem because
                     // the journal contains part of our record but not all of 
it. Writing to the end of this
                     // journal will result in corruption!
-                    repoDirty.set(true);
+                    writer.markDirty();
+                    dirtyWriterCount.incrementAndGet();
                     streamStartTime.set(0L);    // force rollover to happen 
soon.
-                    throw ioe;
+                    throw t;
                 } finally {
                     writer.unlock();
                 }
             } catch (final IOException ioe) {
-                logger.error("Failed to persist Provenance Event due to {}. 
Will not attempt to write to the Provenance Repository again until the 
repository has rolled over.", ioe.toString());
+                // warn about the failure
+                logger.error("Failed to persist Provenance Event due to {}.", 
ioe.toString());
                 logger.error("", ioe);
-                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to persist Provenance Event due to " + ioe.toString() +
-                        ". Will not attempt to write to the Provenance 
Repository again until the repository has rolled over");
-
-                // Switch from readLock to writeLock so that we can perform 
rollover
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to persist Provenance Event due to " + ioe.toString());
+
+                // Attempt to perform a rollover. An IOException in this part 
of the code generally is the result of
+                // running out of disk space. If we have multiple partitions, 
we may well be able to rollover. This helps
+                // in two ways: it compresses the journal files which frees up 
space, and if it ends up merging to a different
+                // partition/storage directory, we can delete the journals 
from this directory that ran out of space.
+                // In order to do this, though, we must switch from a read 
lock to a write lock.
+                // This part of the code gets a little bit messy, and we could 
potentially refactor it a bit in order to
+                // make the code cleaner.
                 readLock.unlock();
                 try {
                     writeLock.lock();
@@ -720,6 +722,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     logger.error("", e);
                     eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to Rollover Provenance Event Repository file due to " + e.toString());
                 } finally {
+                    // we must re-lock the readLock, as the finally block 
below is going to unlock it.
                     readLock.lock();
                 }
 
@@ -752,6 +755,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
     }
 
+    /**
+     * @return all of the Provenance Event Log Files (not the journals, the 
merged files) available across all storage directories.
+     */
     private List<File> getLogFiles() {
         final List<File> files = new ArrayList<>();
         for (final Path path : idToPathMap.get().values()) {
@@ -817,6 +823,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
         }
 
+        // This comparator sorts the data based on the "basename" of the 
files. I.e., the numeric portion.
+        // We do this because the numeric portion represents the ID of the 
first event in the log file.
+        // As a result, we are sorting based on time, since the ID is 
monotonically increasing. By doing this,
+        // are able to avoid hitting disk continually to check timestamps
         final Comparator<File> sortByBasenameComparator = new 
Comparator<File>() {
             @Override
             public int compare(final File o1, final File o2) {
@@ -930,7 +940,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
     }
 
-    public void waitForRollover() throws IOException {
+    /**
+     * Blocks the calling thread until the repository rolls over. This is 
intended for unit testing.
+     */
+    public void waitForRollover() {
         final int count = rolloverCompletions.get();
         while (rolloverCompletions.get() == count) {
             try {
@@ -940,6 +953,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
     }
 
+    /**
+     * @return the number of journal files that exist across all storage 
directories
+     */
     // made protected for testing purposes
     protected int getJournalCount() {
         // determine how many 'journals' we have in the journals directories
@@ -956,7 +972,12 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     }
 
     /**
-     * MUST be called with the write lock held
+     * <p>
+     * MUST be called with the write lock held.
+     * </p>
+     *
+     * Rolls over the data in the journal files, merging them into a single 
Provenance Event Log File, and
+     * compressing and indexing as needed.
      *
      * @param force if true, will force a rollover regardless of whether or 
not data has been written
      * @throws IOException if unable to complete rollover
@@ -968,7 +989,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
         // If this is the first time we're creating the out stream, or if we
         // have written something to the stream, then roll over
-        if (recordsWrittenSinceRollover.get() > 0L || repoDirty.get() || 
force) {
+        if (force || recordsWrittenSinceRollover.get() > 0L || 
dirtyWriterCount.get() > 0) {
             final List<File> journalsToMerge = new ArrayList<>();
             for (final RecordWriter writer : writers) {
                 final File writerFile = writer.getFile();
@@ -986,10 +1007,12 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 logger.debug("Going to merge {} files for journals starting 
with ID {}", journalsToMerge.size(), 
LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
+            // Choose a storage directory to store the merged file in.
             final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
             final List<File> storageDirs = 
configuration.getStorageDirectories();
             final File storageDir = storageDirs.get((int) (storageDirIdx % 
storageDirs.size()));
 
+            // Run the rollover logic in a background thread.
             final AtomicReference<Future<?>> futureReference = new 
AtomicReference<>();
             final int recordsWritten = 
recordsWrittenSinceRollover.getAndSet(0);
             final Runnable rolloverRunnable = new Runnable() {
@@ -999,10 +1022,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         final File fileRolledOver;
 
                         try {
-                            fileRolledOver = mergeJournals(journalsToMerge, 
storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, 
latestRecords);
-                            repoDirty.set(false);
+                            fileRolledOver = mergeJournals(journalsToMerge, 
getMergeFile(journalsToMerge, storageDir), eventReporter);
                         } catch (final IOException ioe) {
-                            repoDirty.set(true);
                             logger.error("Failed to merge Journal Files {} 
into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
                             logger.error("", ioe);
                             return;
@@ -1046,7 +1067,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 }
             };
 
-            // We are going to schedule the future to run every 10 seconds. 
This allows us to keep retrying if we
+            // We are going to schedule the future to run immediately and then 
repeat every 10 seconds. This allows us to keep retrying if we
             // fail for some reason. When we succeed, the Runnable will cancel 
itself.
             final Future<?> future = 
rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, 
TimeUnit.SECONDS);
             futureReference.set(future);
@@ -1061,6 +1082,13 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             final int journalCountThreshold = configuration.getJournalCount() 
* 5;
             final long sizeThreshold = (long) 
(configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max 
capacity
 
+            // check if we need to apply backpressure.
+            // If we have too many journal files, or if the repo becomes too 
large, backpressure is necessary. Without it,
+            // if the rate at which provenance events are registered exceeds 
the rate at which we can compress/merge/index them,
+            // then eventually we will end up with all of the data stored in 
the 'journals' directory and not yet indexed. This
+            // would mean that the data would never even be accessible. In 
order to prevent this, if we exceeds 110% of the configured
+            // max capacity for the repo, or if we have 5 sets of journal 
files waiting to be merged, we will block here until
+            // that is no longer the case.
             if (journalFileCount > journalCountThreshold || repoSize > 
sizeThreshold) {
                 logger.warn("The rate of the dataflow is exceeding the 
provenance recording rate. "
                         + "Slowing down flow to accomodate. Currently, there 
are {} journal files ({} bytes) and "
@@ -1086,14 +1114,17 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         + "journal files to be rolled over is {}", 
journalFileCount);
             }
 
+            // we've finished rolling over successfully. Create new writers 
and reset state.
             writers = createWriters(configuration, idGenerator.get());
+            dirtyWriterCount.set(0);
             streamStartTime.set(System.currentTimeMillis());
             recordsWrittenSinceRollover.getAndSet(0);
         }
     }
 
 
-    private Set<File> recoverJournalFiles() throws IOException {
+    // protected for use in unit tests
+    protected Set<File> recoverJournalFiles() throws IOException {
         if (!configuration.isAllowRollover()) {
             return Collections.emptySet();
         }
@@ -1133,7 +1164,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         for (final List<File> journalFileSet : journalMap.values()) {
             final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
             final File storageDir = storageDirs.get((int) (storageDirIdx % 
storageDirs.size()));
-            final File mergedFile = mergeJournals(journalFileSet, storageDir, 
getMergeFile(journalFileSet, storageDir), eventReporter, latestRecords);
+            final File mergedFile = mergeJournals(journalFileSet, 
getMergeFile(journalFileSet, storageDir), eventReporter);
             if (mergedFile != null) {
                 mergedFiles.add(mergedFile);
             }
@@ -1160,11 +1191,30 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return mergedFile;
     }
 
-    File mergeJournals(final List<File> journalFiles, final File storageDir, 
final File mergedFile, final EventReporter eventReporter,
-            final RingBuffer<ProvenanceEventRecord> ringBuffer) throws 
IOException {
-        logger.debug("Merging {} to {}", journalFiles, mergedFile);
+    /**
+     * <p>
+     * Merges all of the given Journal Files into a single, merged Provenance 
Event Log File. As these records are merged, they will be compressed, if the 
repository is configured to compress records,
+     * and will be indexed.
+     * </p>
+     *
+     * <p>
+     * If the repository is configured to compress the data, the file written 
to may not be the same as the <code>suggestedMergeFile</code>, as a filename 
extension of '.gz' may be appended. If the
+     * journals are successfully merged, the file that they were merged into 
will be returned. If unable to merge the records (for instance, because the 
repository has been closed or because the list
+     * of journal files was empty), this method will return <code>null</code>.
+     * </p>
+     *
+     * @param journalFiles the journal files to merge
+     * @param suggestedMergeFile the file to write the merged records to
+     * @param eventReporter the event reporter to report any warnings or 
errors to; may be null.
+     *
+     * @return the file that the given journals were merged into, or 
<code>null</code> if no records were merged.
+     *
+     * @throws IOException if a problem occurs writing to the mergedFile, 
reading from a journal, or updating the Lucene Index.
+     */
+    File mergeJournals(final List<File> journalFiles, final File 
suggestedMergeFile, final EventReporter eventReporter) throws IOException {
+        logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
         if ( this.closed ) {
-            logger.info("Provenance Repository has been closed; will not merge 
journal files to {}", mergedFile);
+            logger.info("Provenance Repository has been closed; will not merge 
journal files to {}", suggestedMergeFile);
             return null;
         }
 
@@ -1194,7 +1244,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
         // check if we have all of the "partial" files for the journal.
         if (allPartialFiles) {
-            if ( mergedFile.exists() ) {
+            if (suggestedMergeFile.exists()) {
                 // we have all "partial" files and there is already a merged 
file. Delete the data from the index
                 // because the merge file may not be fully merged. We will 
re-merge.
                 logger.warn("Merged Journal File {} already exists; however, 
all partial journal files also exist "
@@ -1202,9 +1252,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
                 final DeleteIndexAction deleteAction = new 
DeleteIndexAction(this, indexConfig, indexManager);
                 try {
-                    deleteAction.execute(mergedFile);
+                    deleteAction.execute(suggestedMergeFile);
                 } catch (final Exception e) {
-                    logger.warn("Failed to delete records from Journal File {} 
from the index; this could potentially result in duplicates. Failure was due to 
{}", mergedFile, e.toString());
+                    logger.warn("Failed to delete records from Journal File {} 
from the index; this could potentially result in duplicates. Failure was due to 
{}", suggestedMergeFile, e.toString());
                     if ( logger.isDebugEnabled() ) {
                         logger.warn("", e);
                     }
@@ -1213,15 +1263,15 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 // Since we only store the file's basename, block offset, and 
event ID, and because the newly created file could end up on
                 // a different Storage Directory than the original, we need to 
ensure that we delete both the partially merged
                 // file and the TOC file. Otherwise, we could get the wrong 
copy and have issues retrieving events.
-                if ( !mergedFile.delete() ) {
+                if (!suggestedMergeFile.delete()) {
                     logger.error("Failed to delete partially written 
Provenance Journal File {}. This may result in events from this journal "
-                            + "file not being able to be displayed. This file 
should be deleted manually.", mergedFile);
+                        + "file not being able to be displayed. This file 
should be deleted manually.", suggestedMergeFile);
                 }
 
-                final File tocFile = TocUtil.getTocFile(mergedFile);
+                final File tocFile = TocUtil.getTocFile(suggestedMergeFile);
                 if ( tocFile.exists() && !tocFile.delete() ) {
                     logger.error("Failed to delete .toc file {}; this may 
result in not being able to read the Provenance Events from the {} Journal 
File. "
-                            + "This can be corrected by manually deleting the 
{} file", tocFile, mergedFile, tocFile);
+                        + "This can be corrected by manually deleting the {} 
file", tocFile, suggestedMergeFile, tocFile);
                 }
             }
         } else {
@@ -1245,7 +1295,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         int records = 0;
 
         final boolean isCompress = configuration.isCompressOnRollover();
-        final File writerFile = isCompress ? new 
File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile;
+        final File writerFile = isCompress ? new 
File(suggestedMergeFile.getParentFile(), suggestedMergeFile.getName() + ".gz") 
: suggestedMergeFile;
 
         try {
             for (final File journalFile : journalFiles) {
@@ -1293,8 +1343,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         logger.warn("", e);
                     }
 
-                    eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + 
e +
+                    if (eventReporter != null) {
+                        eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + 
e +
                             "; it's possible that hte record wasn't completely 
written to the file. This record will be skipped.");
+                    }
                 }
 
                 if (record == null) {
@@ -1332,37 +1384,42 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 try {
                     long maxId = 0L;
 
-                    while (!recordToReaderMap.isEmpty()) {
-                        final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-                        final StandardProvenanceEventRecord record = 
entry.getKey();
-                        final RecordReader reader = entry.getValue();
+                    try {
+                        while (!recordToReaderMap.isEmpty()) {
+                            final Map.Entry<StandardProvenanceEventRecord, 
RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+                            final StandardProvenanceEventRecord record = 
entry.getKey();
+                            final RecordReader reader = entry.getValue();
 
-                        writer.writeRecord(record, record.getEventId());
-                        final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();
+                            writer.writeRecord(record, record.getEventId());
+                            final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();
 
-                        indexingAction.index(record, indexWriter, blockIndex);
-                        maxId = record.getEventId();
+                            indexingAction.index(record, indexWriter, 
blockIndex);
+                            maxId = record.getEventId();
 
-                        latestRecords.add(truncateAttributes(record));
-                        records++;
+                            latestRecords.add(truncateAttributes(record));
+                            records++;
 
-                        // Remove this entry from the map
-                        recordToReaderMap.remove(record);
+                            // Remove this entry from the map
+                            recordToReaderMap.remove(record);
 
-                        // Get the next entry from this reader and add it to 
the map
-                        StandardProvenanceEventRecord nextRecord = null;
+                            // Get the next entry from this reader and add it 
to the map
+                            StandardProvenanceEventRecord nextRecord = null;
 
-                        try {
-                            nextRecord = reader.nextRecord();
-                        } catch (final EOFException eof) {
-                        }
+                            try {
+                                nextRecord = reader.nextRecord();
+                            } catch (final EOFException eof) {
+                            }
 
-                        if (nextRecord != null) {
-                            recordToReaderMap.put(nextRecord, reader);
+                            if (nextRecord != null) {
+                                recordToReaderMap.put(nextRecord, reader);
+                            }
                         }
+                        indexWriter.commit();
+                    } catch (final Throwable t) {
+                        indexWriter.rollback();
+                        throw t;
                     }
 
-                    indexWriter.commit();
                     indexConfig.setMaxIdIndexed(maxId);
                 } finally {
                     indexManager.returnIndexWriter(indexingDirectory, 
indexWriter);
@@ -1370,10 +1427,11 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
 
             // record should now be available in the repository. We can copy 
the values from latestRecords to ringBuffer.
+            final RingBuffer<ProvenanceEventRecord> latestRecordBuffer = 
this.latestRecords;
             latestRecords.forEach(new 
ForEachEvaluator<ProvenanceEventRecord>() {
                 @Override
                 public boolean evaluate(final ProvenanceEventRecord event) {
-                    ringBuffer.add(event);
+                    latestRecordBuffer.add(event);
                     return true;
                 }
             });
@@ -1390,13 +1448,21 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         for (final File journalFile : journalFiles) {
             if (!journalFile.delete() && journalFile.exists()) {
                 logger.warn("Failed to remove temporary journal file {}; this 
file should be cleaned up manually", journalFile.getAbsolutePath());
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; 
this file should be cleaned up manually");
+
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to remove temporary journal file " +
+                        journalFile.getAbsolutePath() + "; this file should be 
cleaned up manually");
+                }
             }
 
             final File tocFile = TocUtil.getTocFile(journalFile);
             if (!tocFile.delete() && tocFile.exists()) {
                 logger.warn("Failed to remove temporary journal TOC file {}; 
this file should be cleaned up manually", tocFile.getAbsolutePath());
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; 
this file should be cleaned up manually");
+
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, "Failed to remove temporary journal TOC file " +
+                        tocFile.getAbsolutePath() + "; this file should be 
cleaned up manually");
+                }
             }
         }
 
@@ -1406,7 +1472,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         } else {
             final long nanos = System.nanoTime() - startNanos;
             final long millis = TimeUnit.MILLISECONDS.convert(nanos, 
TimeUnit.NANOSECONDS);
-            logger.info("Successfully merged {} journal files ({} records) 
into single Provenance Log File {} in {} milliseconds", journalFiles.size(), 
records, mergedFile, millis);
+            logger.info("Successfully merged {} journal files ({} records) 
into single Provenance Log File {} in {} milliseconds", journalFiles.size(), 
records, suggestedMergeFile, millis);
         }
 
         return writerFile;
@@ -1850,7 +1916,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             return true;
         }
 
-        if (repoDirty.get() || writtenSinceRollover > 0 && 
System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) {
+        if ((dirtyWriterCount.get() > 0) || (writtenSinceRollover > 0 && 
System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) {
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a0a321b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 5221ebc..42bc8e9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.nifi.provenance.serialization.RecordReader;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 public class StandardRecordReader implements RecordReader {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordReader.class);
+    private static final Pattern UUID_PATTERN = 
Pattern.compile("[a-fA-F0-9]{8}\\-([a-fA-F0-9]{4}\\-){3}[a-fA-F0-9]{12}");
 
     private final ByteCountingInputStream rawInputStream;
     private final String filename;
@@ -394,7 +396,11 @@ public class StandardRecordReader implements RecordReader {
             // write less data. However, in version 8 we changed to just 
writing
             // out the string because it's extremely expensive to call 
UUID.fromString.
             // In the end, since we generally compress, the savings in minimal 
anyway.
-            return in.readUTF();
+            final String uuid = in.readUTF();
+            if (!UUID_PATTERN.matcher(uuid).matches()) {
+                throw new IOException("Failed to parse Provenance Event 
Record: expected a UUID but got: " + uuid);
+            }
+            return uuid;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a0a321b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index 50caee1..a8c0dd0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -44,6 +45,7 @@ public class StandardRecordWriter implements RecordWriter {
     private final TocWriter tocWriter;
     private final boolean compressed;
     private final int uncompressedBlockSize;
+    private final AtomicBoolean dirtyFlag = new AtomicBoolean(false);
 
     private DataOutputStream out;
     private ByteCountingOutputStream byteCountingOut;
@@ -65,11 +67,11 @@ public class StandardRecordWriter implements RecordWriter {
         this.tocWriter = writer;
     }
 
-    static void writeUUID(final DataOutputStream out, final String uuid) 
throws IOException {
+    protected void writeUUID(final DataOutputStream out, final String uuid) 
throws IOException {
         out.writeUTF(uuid);
     }
 
-    static void writeUUIDs(final DataOutputStream out, final 
Collection<String> list) throws IOException {
+    protected void writeUUIDs(final DataOutputStream out, final 
Collection<String> list) throws IOException {
         if (list == null) {
             out.writeInt(0);
         } else {
@@ -241,7 +243,7 @@ public class StandardRecordWriter implements RecordWriter {
         return byteCountingOut.getBytesWritten() - startBytes;
     }
 
-    private void writeNullableString(final DataOutputStream out, final String 
toWrite) throws IOException {
+    protected void writeNullableString(final DataOutputStream out, final 
String toWrite) throws IOException {
         if (toWrite == null) {
             out.writeBoolean(false);
         } else {
@@ -304,7 +306,16 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public boolean tryLock() {
-        return lock.tryLock();
+        final boolean obtainedLock = lock.tryLock();
+        if (obtainedLock && dirtyFlag.get()) {
+            // once we have obtained the lock, we need to check if the writer
+            // has been marked dirty. If so, we cannot write to the underlying
+            // file, so we need to unlock and return false. Otherwise, it's 
okay
+            // to write to the underlying file, so return true.
+            lock.unlock();
+            return false;
+        }
+        return obtainedLock;
     }
 
     @Override
@@ -324,4 +335,9 @@ public class StandardRecordWriter implements RecordWriter {
     public TocWriter getTocWriter() {
         return tocWriter;
     }
+
+    @Override
+    public void markDirty() {
+        dirtyFlag.set(true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a0a321b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 7c9bcc0..03f1ad0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -67,15 +67,23 @@ public interface RecordWriter extends Closeable {
     /**
      * Attempts to obtain a mutually exclusive lock for this Writer so that
      * operations that must be atomic can be achieved atomically. If the lock 
is
-     * not immediately available, returns <code>false</code>; otherwise, 
obtains
-     * the lock and returns <code>true</code>.
+     * not immediately available, or if the writer is 'dirty' (see {@link 
#markDirty()},
+     * returns <code>false</code>; otherwise, obtains the lock and returns 
<code>true</code>.
      *
      * @return <code>true</code> if the lock was obtained, <code>false</code> 
otherwise.
      */
     boolean tryLock();
 
     /**
+     * Indicates that this Record Writer is 'dirty', meaning that it can no 
longer be
+     * updated. This can happen, for example, if a partial record is written. 
In this case,
+     * writing to this RecordWriter again could cause corruption.
+     */
+    void markDirty();
+
+    /**
      * Syncs the content written to this writer to disk.
+     *
      * @throws IOException if unable to sync content to disk
      */
     void sync() throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a0a321b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 3737588..713180f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -28,10 +28,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -58,15 +60,21 @@ import org.apache.nifi.provenance.search.SearchTerms;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
+import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.stream.io.DataOutputStream;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestPersistentProvenanceRepository {
 
@@ -549,7 +557,12 @@ public class TestPersistentProvenanceRepository {
             builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
 
             for (int i = 0; i < 10; i++) {
-                attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 
String.valueOf(i + j * 10));
+                String uuidSuffix = String.valueOf(i + j * 10);
+                if (uuidSuffix.length() < 2) {
+                    uuidSuffix = "0" + uuidSuffix;
+                }
+
+                attributes.put("uuid", "00000000-0000-0000-0000-0000000000" + 
uuidSuffix);
                 builder.fromFlowFile(createFlowFile(i + j * 10, 3000L, 
attributes));
                 repo.registerEvent(builder.build());
             }
@@ -1202,4 +1215,109 @@ public class TestPersistentProvenanceRepository {
         assertEquals("12345678901234567890123456789012345678901234567890", 
retrieved.getAttributes().get("75chars"));
     }
 
+    @Test
+    public void testBehaviorOnOutOfMemory() throws IOException, 
InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileLife(3, TimeUnit.MINUTES);
+        config.setJournalCount(4);
+
+        // Create a repository that overrides the createWriters() method so 
that we can return writers that will throw
+        // OutOfMemoryError where we want to
+        final AtomicBoolean causeOOME = new AtomicBoolean(false);
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            protected RecordWriter[] createWriters(RepositoryConfiguration 
config, long initialRecordId) throws IOException {
+                final RecordWriter[] recordWriters = 
super.createWriters(config, initialRecordId);
+
+                // Spy on each of the writers so that a call to writeUUID 
throws an OutOfMemoryError if we set the
+                // causeOOME flag to true
+                final StandardRecordWriter[] spiedWriters = new 
StandardRecordWriter[recordWriters.length];
+                for (int i = 0; i < recordWriters.length; i++) {
+                    final StandardRecordWriter writer = (StandardRecordWriter) 
recordWriters[i];
+
+                    spiedWriters[i] = Mockito.spy(writer);
+                    Mockito.doAnswer(new Answer<Object>() {
+                        @Override
+                        public Object answer(final InvocationOnMock 
invocation) throws Throwable {
+                            if (causeOOME.get()) {
+                                throw new OutOfMemoryError();
+                            } else {
+                                writer.writeUUID(invocation.getArgumentAt(0, 
DataOutputStream.class), invocation.getArgumentAt(1, String.class));
+                            }
+                            return null;
+                        }
+                    
}).when(spiedWriters[i]).writeUUID(Mockito.any(DataOutputStream.class), 
Mockito.any(String.class));
+                }
+
+                // return the writers that we are spying on
+                return spiedWriters;
+            }
+        };
+        repo.initialize(getEventReporter());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        // first make sure that we are able to write to the repo successfully.
+        for (int i = 0; i < 4; i++) {
+            final ProvenanceEventRecord record = builder.build();
+            repo.registerEvent(record);
+        }
+
+        // cause OOME to occur
+        causeOOME.set(true);
+
+        // write 4 times to make sure that we mark all partitions as dirty
+        for (int i = 0; i < 4; i++) {
+            final ProvenanceEventRecord record = builder.build();
+            try {
+                repo.registerEvent(record);
+                Assert.fail("Expected OutOfMmeoryError but was able to 
register event");
+            } catch (final OutOfMemoryError oome) {
+            }
+        }
+
+        // now that all partitions are dirty, ensure that as we keep trying to 
write, we get an IllegalStateException
+        // and that we don't corrupt the repository by writing partial records
+        for (int i = 0; i < 8; i++) {
+            final ProvenanceEventRecord record = builder.build();
+            try {
+                repo.registerEvent(record);
+                Assert.fail("Expected OutOfMmeoryError but was able to 
register event");
+            } catch (final IllegalStateException ise) {
+            }
+        }
+
+        // close repo so that we can create a new one to recover records
+        repo.close();
+
+        // make sure we can recover
+        final PersistentProvenanceRepository recoveryRepo = new 
PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            protected Set<File> recoverJournalFiles() throws IOException {
+                try {
+                    return super.recoverJournalFiles();
+                } catch (final IOException ioe) {
+                    Assert.fail("Failed to recover properly");
+                    return null;
+                }
+            }
+        };
+
+        try {
+            recoveryRepo.initialize(getEventReporter());
+        } finally {
+            recoveryRepo.close();
+        }
+    }
+
 }

Reply via email to