Repository: nifi
Updated Branches:
  refs/heads/master 5c36358bc -> 05dabe034


NIFI-1433: Once we roll over journal files, don't attempt to roll them over 
again

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/05dabe03
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/05dabe03
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/05dabe03

Branch: refs/heads/master
Commit: 05dabe034c3bc9db73e8e83d1b026212ca871aad
Parents: 5c36358
Author: Mark Payne <[email protected]>
Authored: Sat Jan 23 15:29:58 2016 -0500
Committer: Aldrin Piri <[email protected]>
Committed: Sun Jan 24 15:29:47 2016 -0500

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 162 +++++++++++--------
 .../nifi/provenance/StandardRecordWriter.java   |   7 +
 .../provenance/serialization/RecordWriter.java  |   5 +
 .../TestPersistentProvenanceRepository.java     | 109 ++++++++++++-
 4 files changed, 209 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/05dabe03/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 1740f51..2b7843a 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -240,6 +240,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                                     } catch (final Exception e) {
                                         logger.error("Failed to roll over 
Provenance Event Log due to {}", e.toString());
                                         logger.error("", e);
+                                        
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to roll over 
Provenance Event Log due to " + e.toString());
                                     }
                                 }
                             } finally {
@@ -730,7 +731,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 } catch (final Exception e) {
                     logger.error("Failed to Rollover Provenance Event 
Repository file due to {}", e.toString());
                     logger.error("", e);
-                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to Rollover Provenance Event Repository file due to " + e.toString());
+                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to Rollover Provenance Event Log due to " + e.toString());
                 } finally {
                     // we must re-lock the readLock, as the finally block 
below is going to unlock it.
                     readLock.lock();
@@ -756,7 +757,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     } catch (final IOException e) {
                         logger.error("Failed to Rollover Provenance Event 
Repository file due to {}", e.toString());
                         logger.error("", e);
-                        eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + 
e.toString());
+                        eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to Rollover Provenance Event Log due to " + 
e.toString());
                     }
                 }
             } finally {
@@ -1142,6 +1143,22 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         return journalFileCount;
     }
 
+
+    /**
+     * Method is exposed for unit testing
+     *
+     * @param force whether or not to force a rollover.
+     * @throws IOException if unable to complete rollover
+     */
+    void rolloverWithLock(final boolean force) throws IOException {
+        writeLock.lock();
+        try {
+            rollover(force);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     /**
      * <p>
      * MUST be called with the write lock held.
@@ -1163,19 +1180,26 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         if (force || recordsWrittenSinceRollover.get() > 0L || 
dirtyWriterCount.get() > 0) {
             final List<File> journalsToMerge = new ArrayList<>();
             for (final RecordWriter writer : writers) {
-                final File writerFile = writer.getFile();
-                journalsToMerge.add(writerFile);
-                try {
-                    writer.close();
-                } catch (final IOException ioe) {
-                    logger.warn("Failed to close {} due to {}", writer, 
ioe.toString());
-                    if ( logger.isDebugEnabled() ) {
-                        logger.warn("", ioe);
+                if (!writer.isClosed()) {
+                    final File writerFile = writer.getFile();
+                    journalsToMerge.add(writerFile);
+                    try {
+                        writer.close();
+                    } catch (final IOException ioe) {
+                        logger.warn("Failed to close {} due to {}", writer, 
ioe.toString());
+                        if (logger.isDebugEnabled()) {
+                            logger.warn("", ioe);
+                        }
                     }
                 }
             }
-            if ( logger.isDebugEnabled() ) {
-                logger.debug("Going to merge {} files for journals starting 
with ID {}", journalsToMerge.size(), 
LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+
+            if (logger.isDebugEnabled()) {
+                if (journalsToMerge.isEmpty()) {
+                    logger.debug("No journals to merge; all RecordWriters were 
already closed");
+                } else {
+                    logger.debug("Going to merge {} files for journals 
starting with ID {}", journalsToMerge.size(), 
LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+                }
             }
 
             // Choose a storage directory to store the merged file in.
@@ -1183,66 +1207,69 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             final List<File> storageDirs = 
configuration.getStorageDirectories();
             final File storageDir = storageDirs.get((int) (storageDirIdx % 
storageDirs.size()));
 
-            // Run the rollover logic in a background thread.
-            final AtomicReference<Future<?>> futureReference = new 
AtomicReference<>();
-            final int recordsWritten = 
recordsWrittenSinceRollover.getAndSet(0);
-            final Runnable rolloverRunnable = new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        final File fileRolledOver;
-
+            Future<?> future = null;
+            if (!journalsToMerge.isEmpty()) {
+                // Run the rollover logic in a background thread.
+                final AtomicReference<Future<?>> futureReference = new 
AtomicReference<>();
+                final int recordsWritten = 
recordsWrittenSinceRollover.getAndSet(0);
+                final Runnable rolloverRunnable = new Runnable() {
+                    @Override
+                    public void run() {
                         try {
-                            fileRolledOver = mergeJournals(journalsToMerge, 
getMergeFile(journalsToMerge, storageDir), eventReporter);
-                        } catch (final IOException ioe) {
-                            logger.error("Failed to merge Journal Files {} 
into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
-                            logger.error("", ioe);
-                            return;
-                        }
+                            final File fileRolledOver;
 
-                        if (fileRolledOver == null) {
-                            logger.debug("Couldn't merge journals. Will try 
again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, 
storageDir);
-                            return;
-                        }
-                        final File file = fileRolledOver;
-
-                        // update our map of id to Path
-                        // We need to make sure that another thread doesn't 
also update the map at the same time. We cannot
-                        // use the write lock when purging old events, and we 
want to use the same approach here.
-                        boolean updated = false;
-                        final Long fileFirstEventId = 
Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
-                        while (!updated) {
-                            final SortedMap<Long, Path> existingPathMap = 
idToPathMap.get();
-                            final SortedMap<Long, Path> newIdToPathMap = new 
TreeMap<>(new PathMapComparator());
-                            newIdToPathMap.putAll(existingPathMap);
-                            newIdToPathMap.put(fileFirstEventId, 
file.toPath());
-                            updated = 
idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
-                        }
+                            try {
+                                fileRolledOver = 
mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), 
eventReporter);
+                            } catch (final IOException ioe) {
+                                logger.error("Failed to merge Journal Files {} 
into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+                                logger.error("", ioe);
+                                return;
+                            }
+
+                            if (fileRolledOver == null) {
+                                logger.debug("Couldn't merge journals. Will 
try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, 
storageDir);
+                                return;
+                            }
+                            final File file = fileRolledOver;
+
+                            // update our map of id to Path
+                            // We need to make sure that another thread 
doesn't also update the map at the same time. We cannot
+                            // use the write lock when purging old events, and 
we want to use the same approach here.
+                            boolean updated = false;
+                            final Long fileFirstEventId = 
Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
+                            while (!updated) {
+                                final SortedMap<Long, Path> existingPathMap = 
idToPathMap.get();
+                                final SortedMap<Long, Path> newIdToPathMap = 
new TreeMap<>(new PathMapComparator());
+                                newIdToPathMap.putAll(existingPathMap);
+                                newIdToPathMap.put(fileFirstEventId, 
file.toPath());
+                                updated = 
idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
+                            }
 
-                        logger.info("Successfully Rolled over Provenance Event 
file containing {} records", recordsWritten);
-                        rolloverCompletions.getAndIncrement();
+                            logger.info("Successfully Rolled over Provenance 
Event file containing {} records", recordsWritten);
+                            rolloverCompletions.getAndIncrement();
 
-                        // We have finished successfully. Cancel the future so 
that we don't run anymore
-                        Future<?> future;
-                        while ((future = futureReference.get()) == null) {
-                            try {
-                                Thread.sleep(10L);
-                            } catch (final InterruptedException ie) {
+                            // We have finished successfully. Cancel the 
future so that we don't run anymore
+                            Future<?> future;
+                            while ((future = futureReference.get()) == null) {
+                                try {
+                                    Thread.sleep(10L);
+                                } catch (final InterruptedException ie) {
+                                }
                             }
-                        }
 
-                        future.cancel(false);
-                    } catch (final Throwable t) {
-                        logger.error("Failed to rollover Provenance repository 
due to {}", t.toString());
-                        logger.error("", t);
+                            future.cancel(false);
+                        } catch (final Throwable t) {
+                            logger.error("Failed to rollover Provenance 
repository due to {}", t.toString());
+                            logger.error("", t);
+                        }
                     }
-                }
-            };
+                };
 
-            // We are going to schedule the future to run immediately and then 
repeat every 10 seconds. This allows us to keep retrying if we
-            // fail for some reason. When we succeed, the Runnable will cancel 
itself.
-            final Future<?> future = 
rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, 
TimeUnit.SECONDS);
-            futureReference.set(future);
+                // We are going to schedule the future to run immediately and 
then repeat every 10 seconds. This allows us to keep retrying if we
+                // fail for some reason. When we succeed, the Runnable will 
cancel itself.
+                future = 
rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, 
TimeUnit.SECONDS);
+                futureReference.set(future);
+            }
 
             streamStartTime.set(System.currentTimeMillis());
             bytesWrittenSinceRollover.set(0);
@@ -1271,7 +1298,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 while (journalFileCount > journalCountThreshold || repoSize > 
sizeThreshold) {
                     // if a shutdown happens while we are in this loop, kill 
the rollover thread and break
                     if (this.closed.get()) {
-                        future.cancel(true);
+                        if (future != null) {
+                            future.cancel(true);
+                        }
+
                         break;
                     }
 
@@ -1504,7 +1534,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     }
 
                     if (eventReporter != null) {
-                        eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "Failed to merge Journal Files due to " + ioe.toString());
+                        eventReporter.reportEvent(Severity.ERROR, 
EVENT_CATEGORY, "re " + ioe.toString());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/05dabe03/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index 981301e..a5c121a 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -51,6 +51,7 @@ public class StandardRecordWriter implements RecordWriter {
     private ByteCountingOutputStream byteCountingOut;
     private long lastBlockOffset = 0L;
     private int recordCount = 0;
+    private volatile boolean closed = false;
 
     private final Lock lock = new ReentrantLock();
 
@@ -295,6 +296,8 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public synchronized void close() throws IOException {
+        closed = true;
+
         logger.trace("Closing Record Writer for {}", file.getName());
 
         lock();
@@ -330,7 +333,11 @@ public class StandardRecordWriter implements RecordWriter {
         } finally {
             unlock();
         }
+    }
 
+    @Override
+    public boolean isClosed() {
+        return closed;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/05dabe03/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 03f1ad0..b157ccc 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -92,4 +92,9 @@ public interface RecordWriter extends Closeable {
      * @return the TOC Writer that is being used to write the Table of 
Contents for this journal
      */
     TocWriter getTocWriter();
+
+    /**
+     * @return <code>true</code> if this Writer has been closed via the {@link 
#close()} method, <code>false</code> otherwise
+     */
+    boolean isClosed();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/05dabe03/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 02b9216..4a5c08c 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -27,6 +27,7 @@ import java.io.FileFilter;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -89,6 +90,8 @@ public class TestPersistentProvenanceRepository {
     private RepositoryConfiguration config;
 
     public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
+    private EventReporter eventReporter;
+    private List<ReportedEvent> reportedEvents = 
Collections.synchronizedList(new ArrayList<ReportedEvent>());
 
     private RepositoryConfiguration createConfiguration() {
         config = new RepositoryConfiguration();
@@ -107,6 +110,17 @@ public class TestPersistentProvenanceRepository {
     @Before
     public void printTestName() {
         System.out.println("\n\n\n***********************  " + 
name.getMethodName() + "  *****************************");
+
+        reportedEvents.clear();
+        eventReporter = new EventReporter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public void reportEvent(Severity severity, String category, String 
message) {
+                reportedEvents.add(new ReportedEvent(severity, category, 
message));
+                System.out.println(severity + " : " + category + " : " + 
message);
+            }
+        };
     }
 
     @After
@@ -146,14 +160,7 @@ public class TestPersistentProvenanceRepository {
 
 
     private EventReporter getEventReporter() {
-        return new EventReporter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public void reportEvent(Severity severity, String category, String 
message) {
-                System.out.println(severity + " : " + category + " : " + 
message);
-            }
-        };
+        return eventReporter;
     }
 
     @Test
@@ -1238,6 +1245,68 @@ public class TestPersistentProvenanceRepository {
         assertEquals("12345678901234567890123456789012345678901234567890", 
retrieved.getAttributes().get("75chars"));
     }
 
+
+    @Test
+    public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() 
throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxAttributeChars(50);
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+
+        // Create a repo that will allow only a single writer to be created.
+        final IOException failure = new IOException("Already created writers 
once. Unit test causing failure.");
+        repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
+            int iterations = 0;
+
+            @Override
+            protected RecordWriter[] createWriters(RepositoryConfiguration 
config, long initialRecordId) throws IOException {
+                if (iterations++ == 1) {
+                    throw failure;
+                } else {
+                    return super.createWriters(config, initialRecordId);
+                }
+            }
+        };
+
+        // initialize with our event reporter
+        repo.initialize(getEventReporter());
+
+        // create some events in the journal files.
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 50; i++) {
+            final ProvenanceEventRecord event = builder.build();
+            repo.registerEvent(event);
+        }
+
+        // Attempt to rollover but fail to create new writers.
+        try {
+            repo.rolloverWithLock(true);
+            Assert.fail("Expected to get IOException when calling 
rolloverWithLock");
+        } catch (final IOException ioe) {
+            assertTrue(ioe == failure);
+        }
+
+        // Wait for the first rollover to succeed.
+        repo.waitForRollover();
+
+        // This time when we rollover, we should not have a problem rolling 
over.
+        repo.rolloverWithLock(true);
+
+        // Ensure that no errors were reported.
+        assertEquals(0, reportedEvents.size());
+    }
+
+
     @Test
     public void testBehaviorOnOutOfMemory() throws IOException, 
InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
@@ -1343,4 +1412,28 @@ public class TestPersistentProvenanceRepository {
         }
     }
 
+
+    private static class ReportedEvent {
+        private final Severity severity;
+        private final String category;
+        private final String message;
+
+        public ReportedEvent(final Severity severity, final String category, 
final String message) {
+            this.severity = severity;
+            this.category = category;
+            this.message = message;
+        }
+
+        public String getCategory() {
+            return category;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public Severity getSeverity() {
+            return severity;
+        }
+    }
 }

Reply via email to