NIFI-450: Catch Throwable from all implementations of Runnable in the 
FileSystemRepository; these are expected to always be running, so if anything 
odd like an OutOfMemoryError occurs, this needs to be caught rather than 
allowing the thread to die


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/54f3476a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/54f3476a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/54f3476a

Branch: refs/heads/develop
Commit: 54f3476a4cc975f52d543a53f1f9aec97ba3b858
Parents: d640804
Author: Mark Payne <[email protected]>
Authored: Mon Mar 23 15:19:18 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Mar 23 15:19:18 2015 -0400

----------------------------------------------------------------------
 .../repository/FileSystemRepository.java        | 78 ++++++++++----------
 1 file changed, 41 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54f3476a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index a3e24c4..2c3751b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -791,34 +791,38 @@ public class FileSystemRepository implements 
ContentRepository {
 
         @Override
         public void run() {
-            // Get all of the Destructable Claims and bin them based on their 
Container. We do this
-            // because the Container generally maps to a physical partition on 
the disk, so we want a few
-            // different threads hitting the different partitions but don't 
want multiple threads hitting
-            // the same partition.
-            final List<ContentClaim> toDestroy = new ArrayList<>();
-            while (true) {
-                toDestroy.clear();
-                contentClaimManager.drainDestructableClaims(toDestroy, 10000);
-                if (toDestroy.isEmpty()) {
-                    return;
-                }
-
-                for (final ContentClaim claim : toDestroy) {
-                    final String container = claim.getContainer();
-                    final BlockingQueue<ContentClaim> claimQueue = 
reclaimable.get(container);
-
-                    try {
-                        while (true) {
-                            if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) 
{
-                                break;
-                            } else {
-                                LOG.warn("Failed to clean up {} because old 
claims aren't being cleaned up fast enough. This Content Claim will remain in 
the Content Repository until NiFi is restarted, at which point it will be 
cleaned up", claim);
+            try {
+                // Get all of the Destructable Claims and bin them based on 
their Container. We do this
+                // because the Container generally maps to a physical 
partition on the disk, so we want a few
+                // different threads hitting the different partitions but 
don't want multiple threads hitting
+                // the same partition.
+                final List<ContentClaim> toDestroy = new ArrayList<>();
+                while (true) {
+                    toDestroy.clear();
+                    contentClaimManager.drainDestructableClaims(toDestroy, 
10000);
+                    if (toDestroy.isEmpty()) {
+                        return;
+                    }
+    
+                    for (final ContentClaim claim : toDestroy) {
+                        final String container = claim.getContainer();
+                        final BlockingQueue<ContentClaim> claimQueue = 
reclaimable.get(container);
+    
+                        try {
+                            while (true) {
+                                if (claimQueue.offer(claim, 10, 
TimeUnit.MINUTES)) {
+                                    break;
+                                } else {
+                                    LOG.warn("Failed to clean up {} because 
old claims aren't being cleaned up fast enough. This Content Claim will remain 
in the Content Repository until NiFi is restarted, at which point it will be 
cleaned up", claim);
+                                }
                             }
+                        } catch (final InterruptedException ie) {
+                            LOG.warn("Failed to clean up {} because thread was 
interrupted", claim);
                         }
-                    } catch (final InterruptedException ie) {
-                        LOG.warn("Failed to clean up {} because thread was 
interrupted", claim);
                     }
                 }
+            } catch (final Throwable t) {
+                LOG.error("Failed to cleanup content claims due to {}", t);
             }
         }
     }
@@ -1198,23 +1202,23 @@ public class FileSystemRepository implements 
ContentRepository {
 
         @Override
         public void run() {
-            if (oldestArchiveDate.get() > (System.currentTimeMillis() - 
maxArchiveMillis)) {
-                final Long minRequiredSpace = 
minUsableContainerBytesForArchive.get(containerName);
-                if (minRequiredSpace == null) {
-                    return;
-                }
-
-                try {
-                    final long usableSpace = 
getContainerUsableSpace(containerName);
-                    if (usableSpace > minRequiredSpace) {
+            try {
+                if (oldestArchiveDate.get() > (System.currentTimeMillis() - 
maxArchiveMillis)) {
+                    final Long minRequiredSpace = 
minUsableContainerBytesForArchive.get(containerName);
+                    if (minRequiredSpace == null) {
                         return;
                     }
-                } catch (final Exception e) {
-                    LOG.error("Failed to determine space available in 
container {}; will attempt to cleanup archive", containerName);
+    
+                    try {
+                        final long usableSpace = 
getContainerUsableSpace(containerName);
+                        if (usableSpace > minRequiredSpace) {
+                            return;
+                        }
+                    } catch (final Exception e) {
+                        LOG.error("Failed to determine space available in 
container {}; will attempt to cleanup archive", containerName);
+                    }
                 }
-            }
 
-            try {
                 Thread.currentThread().setName("Cleanup Archive for " + 
containerName);
                 final long oldestContainerArchive;
 

Reply via email to