Repository: nifi
Updated Branches:
  refs/heads/master 72ccc252f -> 85534ca86


NIFI-916 Refactoring how the closed flag is used in 
PersistentProvenanceRepository to ensure proper shutdown


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/85534ca8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/85534ca8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/85534ca8

Branch: refs/heads/master
Commit: 85534ca860b6e0169a7d66c99ac3f293031ccbbc
Parents: 72ccc25
Author: Bryan Bende <[email protected]>
Authored: Tue Sep 1 14:34:25 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Sun Sep 6 13:22:07 2015 -0400

----------------------------------------------------------------------
 .../provenance/PersistentProvenanceRepository.java    | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/85534ca8/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 3497e12..89e1419 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -124,7 +124,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     private final AtomicReference<SortedMap<Long, Path>> idToPathMap = new 
AtomicReference<>();
     private final AtomicBoolean recoveryFinished = new AtomicBoolean(false);
 
-    private volatile boolean closed = false;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
     private volatile long firstEventTimestamp = 0L;
 
     // the following are all protected by the lock
@@ -630,11 +630,11 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
 
     @Override
     public synchronized void close() throws IOException {
+        this.closed.set(true);
         writeLock.lock();
         try {
             logger.debug("Obtained write lock for close");
 
-            this.closed = true;
             scheduledExecService.shutdownNow();
             rolloverExecutor.shutdownNow();
             queryExecService.shutdownNow();
@@ -652,7 +652,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
     }
 
     public boolean isShutdownComplete() {
-        return this.closed;
+        return this.closed.get();
     }
 
     private void persistRecord(final Iterable<ProvenanceEventRecord> records) {
@@ -1269,6 +1269,12 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
                         + "exceeding the provenance recording rate. Slowing 
down flow to accommodate");
 
                 while (journalFileCount > journalCountThreshold || repoSize > 
sizeThreshold) {
+                    // if a shutdown happens while we are in this loop, kill 
the rollover thread and break
+                    if (this.closed.get()) {
+                        future.cancel(true);
+                        break;
+                    }
+
                     if (repoSize > sizeThreshold) {
                         logger.debug("Provenance Repository has exceeded its 
size threshold; will trigger purging of oldest events");
                         purgeOldEvents();
@@ -1397,7 +1403,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
      */
     File mergeJournals(final List<File> journalFiles, final File 
suggestedMergeFile, final EventReporter eventReporter) throws IOException {
         logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
-        if ( this.closed ) {
+        if ( this.closed.get() ) {
             logger.info("Provenance Repository has been closed; will not merge 
journal files to {}", suggestedMergeFile);
             return null;
         }

Reply via email to