Repository: nifi Updated Branches: refs/heads/master 72ccc252f -> 85534ca86
NIFI-916 Refactoring how the closed flag is used in PersistentProvenanceRepository to ensure proper shutdown Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/85534ca8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/85534ca8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/85534ca8 Branch: refs/heads/master Commit: 85534ca860b6e0169a7d66c99ac3f293031ccbbc Parents: 72ccc25 Author: Bryan Bende <[email protected]> Authored: Tue Sep 1 14:34:25 2015 -0400 Committer: Bryan Bende <[email protected]> Committed: Sun Sep 6 13:22:07 2015 -0400 ---------------------------------------------------------------------- .../provenance/PersistentProvenanceRepository.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/85534ca8/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 3497e12..89e1419 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -124,7 +124,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final AtomicReference<SortedMap<Long, Path>> idToPathMap = new AtomicReference<>(); private final AtomicBoolean recoveryFinished = new AtomicBoolean(false); - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); private volatile long firstEventTimestamp = 0L; // the following are all protected by the lock @@ -630,11 +630,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository @Override public synchronized void close() throws IOException { + this.closed.set(true); writeLock.lock(); try { logger.debug("Obtained write lock for close"); - this.closed = true; scheduledExecService.shutdownNow(); rolloverExecutor.shutdownNow(); queryExecService.shutdownNow(); @@ -652,7 +652,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } public boolean isShutdownComplete() { - return this.closed; + return this.closed.get(); } private void persistRecord(final Iterable<ProvenanceEventRecord> records) { @@ -1269,6 +1269,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository + "exceeding the provenance recording rate. Slowing down flow to accommodate"); while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { + // if a shutdown happens while we are in this loop, kill the rollover thread and break + if (this.closed.get()) { + future.cancel(true); + break; + } + if (repoSize > sizeThreshold) { logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events"); purgeOldEvents(); @@ -1397,7 +1403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository */ File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException { logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile); - if ( this.closed ) { + if ( this.closed.get() ) { logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile); return null; }
