NIFI-566: Fixed issues related to backpressure

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e30cd23f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e30cd23f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e30cd23f

Branch: refs/heads/develop
Commit: e30cd23fc258c5831761249536e6a1a83b93f5e0
Parents: 5aeac2e
Author: Mark Payne <[email protected]>
Authored: Thu Apr 30 13:12:07 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Apr 30 13:12:07 2015 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 115 ++++++++++---------
 1 file changed, 59 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e30cd23f/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 6e05535..27f2cbb 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -240,6 +240,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     }
                 }, rolloverCheckMillis, rolloverCheckMillis, 
TimeUnit.MILLISECONDS);
 
+                expirationActions.add(new DeleteIndexAction(this, indexConfig, 
indexManager));
+                expirationActions.add(new FileRemovalAction());
+
                 scheduledExecService.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
                 scheduledExecService.scheduleWithFixedDelay(new Runnable() {
                     @Override
@@ -255,9 +258,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         }
                     }
                 }, 1L, 1L, TimeUnit.MINUTES);
-
-                expirationActions.add(new DeleteIndexAction(this, indexConfig, 
indexManager));
-                expirationActions.add(new FileRemovalAction());
             }
         } finally {
             writeLock.unlock();
@@ -825,8 +825,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
             }
         };
 
-        // If we have too much data, start aging it off
-        if (bytesUsed > configuration.getMaxStorageCapacity()) {
+        // If we have too much data (at least 90% of our max capacity), start 
aging it off
+        if (bytesUsed > configuration.getMaxStorageCapacity() * 0.9) {
             Collections.sort(sortedByBasename, sortByBasenameComparator);
 
             for (final File file : sortedByBasename) {
@@ -879,15 +879,15 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         }
 
         // Update the Map ID to Path map to not include the removed file
-        // we use a lock here because otherwise we have a check-then-modify 
between get/set on the AtomicReference.
-        // But we keep the AtomicReference because in other places we do just 
a .get()
-        writeLock.lock();
-        try {
-            logger.debug("Obtained write lock to update ID/Path Map for 
expiration");
-
-            final SortedMap<Long, Path> pathMap = idToPathMap.get();
+        // We cannot obtain the write lock here because there may be a need 
for the lock in the rollover method,
+        // if we have 'backpressure applied'. This would result in a deadlock 
because the rollover method would be
+        // waiting for purgeOldEvents, and purgeOldEvents would be waiting for 
the write lock held by rollover.
+        boolean updated = false;
+        while (!updated) {
+            final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
             final SortedMap<Long, Path> newPathMap = new TreeMap<>(new 
PathMapComparator());
-            newPathMap.putAll(pathMap);
+            newPathMap.putAll(existingPathMap);
+
             final Iterator<Map.Entry<Long, Path>> itr = 
newPathMap.entrySet().iterator();
             while (itr.hasNext()) {
                 final Map.Entry<Long, Path> entry = itr.next();
@@ -898,10 +898,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                     itr.remove();
                 }
             }
-            idToPathMap.set(newPathMap);
+
+            updated = idToPathMap.compareAndSet(existingPathMap, newPathMap);
             logger.debug("After expiration, path map: {}", newPathMap);
-        } finally {
-            writeLock.unlock();
         }
     }
 
@@ -961,36 +960,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                 logger.debug("Going to merge {} files for journals starting 
with ID {}", journalsToMerge.size(), 
LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
-            int journalFileCount = getJournalCount();
-            final int journalCountThreshold = configuration.getJournalCount() 
* 5;
-            if ( journalFileCount > journalCountThreshold ) {
-                logger.warn("The rate of the dataflow is exceeding the 
provenance recording rate. "
-                        + "Slowing down flow to accomodate. Currently, there 
are {} journal files and "
-                        + "threshold for blocking is {}", journalFileCount, 
journalCountThreshold);
-                eventReporter.reportEvent(Severity.WARNING, "Provenance 
Repository", "The rate of the dataflow is "
-                        + "exceeding the provenance recording rate. Slowing 
down flow to accomodate");
-
-                while (journalFileCount > journalCountThreshold) {
-                    try {
-                        Thread.sleep(1000L);
-                    } catch (final InterruptedException ie) {
-                    }
-
-                    logger.debug("Provenance Repository is still behind. 
Keeping flow slowed down "
-                            + "to accomodate. Currently, there are {} journal 
files and "
-                            + "threshold for blocking is {}", 
journalFileCount, journalCountThreshold);
-
-                    journalFileCount = getJournalCount();
-                }
-
-                logger.info("Provenance Repository has no caught up with 
rolling over journal files. Current number of "
-                        + "journal files to be rolled over is {}", 
journalFileCount);
-            }
-
-            writers = createWriters(configuration, idGenerator.get());
-            streamStartTime.set(System.currentTimeMillis());
-            recordsWrittenSinceRollover.getAndSet(0);
-
             final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
             final List<File> storageDirs = 
configuration.getStorageDirectories();
             final File storageDir = storageDirs.get((int) (storageDirIdx % 
storageDirs.size()));
@@ -1019,18 +988,16 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         final File file = fileRolledOver;
 
                         // update our map of id to Path
-                        // need lock to update the map, even though it's an 
AtomicReference, AtomicReference allows those doing a
-                        // get() to obtain the most up-to-date version but we 
use a writeLock to prevent multiple threads modifying
-                        // it at one time
-                        writeLock.lock();
-                        try {
-                            final Long fileFirstEventId = 
Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
+                        // We need to make sure that another thread doesn't 
also update the map at the same time. We cannot
+                        // use the write lock when purging old events, and we 
want to use the same approach here.
+                        boolean updated = false;
+                        final Long fileFirstEventId = 
Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
+                        while (!updated) {
+                            final SortedMap<Long, Path> existingPathMap = 
idToPathMap.get();
                             final SortedMap<Long, Path> newIdToPathMap = new 
TreeMap<>(new PathMapComparator());
-                            newIdToPathMap.putAll(idToPathMap.get());
+                            newIdToPathMap.putAll(existingPathMap);
                             newIdToPathMap.put(fileFirstEventId, 
file.toPath());
-                            idToPathMap.set(newIdToPathMap);
-                        } finally {
-                            writeLock.unlock();
+                            updated = 
idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
                         }
 
                         logger.info("Successfully Rolled over Provenance Event 
file containing {} records", recordsWritten);
@@ -1060,6 +1027,42 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
             streamStartTime.set(System.currentTimeMillis());
             bytesWrittenSinceRollover.set(0);
+
+            // We don't want to create new 'writers' until the number of 
unmerged journals falls below our threshold. So we wait
+            // here before we repopulate the 'writers' member variable and 
release the lock.
+            int journalFileCount = getJournalCount();
+            long repoSize = getSize(getLogFiles(), 0L);
+            final int journalCountThreshold = configuration.getJournalCount() 
* 5;
+            final long sizeThreshold = (long) 
(configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max 
capacity
+
+            if (journalFileCount > journalCountThreshold || repoSize > 
sizeThreshold) {
+                logger.warn("The rate of the dataflow is exceeding the 
provenance recording rate. "
+                        + "Slowing down flow to accomodate. Currently, there 
are {} journal files ({} bytes) and "
+                        + "threshold for blocking is {} ({} bytes)", 
journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
+                eventReporter.reportEvent(Severity.WARNING, "Provenance 
Repository", "The rate of the dataflow is "
+                        + "exceeding the provenance recording rate. Slowing 
down flow to accomodate");
+
+                while (journalFileCount > journalCountThreshold || repoSize > 
sizeThreshold) {
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {
+                    }
+
+                    logger.debug("Provenance Repository is still behind. 
Keeping flow slowed down "
+                            + "to accomodate. Currently, there are {} journal 
files ({} bytes) and "
+                            + "threshold for blocking is {} ({} bytes)", 
journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
+
+                    journalFileCount = getJournalCount();
+                    repoSize = getSize(getLogFiles(), 0L);
+                }
+
+                logger.info("Provenance Repository has now caught up with 
rolling over journal files. Current number of "
+                        + "journal files to be rolled over is {}", 
journalFileCount);
+            }
+
+            writers = createWriters(configuration, idGenerator.get());
+            streamStartTime.set(System.currentTimeMillis());
+            recordsWrittenSinceRollover.getAndSet(0);
         }
     }
 

Reply via email to