NIFI-1070: Added detailed debug-level logging about how FileSystemRepository is 
choosing to expire archived data


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aec32a27
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aec32a27
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aec32a27

Branch: refs/heads/NIFI-730
Commit: aec32a277c3f2b707edd20a9eff9c4984f4f28fa
Parents: f8c3377
Author: Mark Payne <[email protected]>
Authored: Mon Oct 26 14:36:03 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Oct 26 14:36:03 2015 -0400

----------------------------------------------------------------------
 .../repository/FileSystemRepository.java        | 26 ++++++++++++++------
 1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/aec32a27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 724e26e..72a50ec 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -87,6 +87,8 @@ public class FileSystemRepository implements 
ContentRepository {
     public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = 
Pattern.compile("\\d{1,2}%");
     private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemRepository.class);
 
+    private final Logger archiveExpirationLog = 
LoggerFactory.getLogger(FileSystemRepository.class.getName() + 
".archive.expiration");
+
     private final Map<String, Path> containers;
     private final List<String> containerNames;
     private final AtomicLong index;
@@ -151,7 +153,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
             if (maxArchiveSize == null) {
                 throw new RuntimeException("No value specified for property '"
-                        + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE 
+ "' but archiving is enabled. You must configure the max disk usage in order 
to enable archiving.");
+                    + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' 
but archiving is enabled. You must configure the max disk usage in order to 
enable archiving.");
             }
 
             if 
(!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
@@ -185,7 +187,7 @@ public class FileSystemRepository implements 
ContentRepository {
                 final long maxArchiveBytes = (long) (capacity * (1D - 
(maxArchiveRatio - 0.02)));
                 minUsableContainerBytesForArchive.put(container.getKey(), 
Long.valueOf(maxArchiveBytes));
                 LOG.info("Maximum Threshold for Container {} set to {} bytes; 
if volume exceeds this size, archived data will be deleted until it no longer 
exceeds this size",
-                        containerName, maxArchiveBytes);
+                    containerName, maxArchiveBytes);
 
                 final long backPressureBytes = (long) 
(Files.getFileStore(container.getValue()).getTotalSpace() * 
archiveBackPressureRatio);
                 final ContainerState containerState = new 
ContainerState(containerName, true, backPressureBytes, capacity);
@@ -620,7 +622,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
         final File file = path.toFile();
         if (!file.delete() && file.exists()) {
-            LOG.warn("Unable to delete {} at path {}", new Object[]{claim, 
path});
+            LOG.warn("Unable to delete {} at path {}", new Object[] {claim, 
path});
             return false;
         }
 
@@ -1051,7 +1053,7 @@ public class FileSystemRepository implements 
ContentRepository {
                                     break;
                                 } else {
                                     LOG.warn("Failed to clean up {} because 
old claims aren't being cleaned up fast enough. "
-                                            + "This Content Claim will remain 
in the Content Repository until NiFi is restarted, at which point it will be 
cleaned up", claim);
+                                        + "This Content Claim will remain in 
the Content Repository until NiFi is restarted, at which point it will be 
cleaned up", claim);
                                 }
                             }
                         } catch (final InterruptedException ie) {
@@ -1187,6 +1189,7 @@ public class FileSystemRepository implements 
ContentRepository {
     }
 
     private long destroyExpiredArchives(final String containerName, final Path 
container) throws IOException {
+        archiveExpirationLog.debug("Destroying Expired Archives for Container 
{}", containerName);
         final List<ArchiveInfo> notYetExceedingThreshold = new ArrayList<>();
         final long removalTimeThreshold = System.currentTimeMillis() - 
maxArchiveMillis;
         long oldestArchiveDateFound = System.currentTimeMillis();
@@ -1194,6 +1197,7 @@ public class FileSystemRepository implements 
ContentRepository {
         // determine how much space we must have in order to stop deleting old 
data
         final Long minRequiredSpace = 
minUsableContainerBytesForArchive.get(containerName);
         if (minRequiredSpace == null) {
+            archiveExpirationLog.debug("Could not determine minimum required 
space so will not destroy any archived data");
             return -1L;
         }
 
@@ -1204,6 +1208,7 @@ public class FileSystemRepository implements 
ContentRepository {
         final long startNanos = System.nanoTime();
         final long toFree = minRequiredSpace - usableSpace;
         final BlockingQueue<ArchiveInfo> fileQueue = 
archivedFiles.get(containerName);
+        archiveExpirationLog.info("Currently {} bytes free for Container {}; 
requirement is {} byte free, so need to free {} bytes", usableSpace, 
containerName, minRequiredSpace, toFree);
 
         ArchiveInfo toDelete;
         int deleteCount = 0;
@@ -1217,7 +1222,7 @@ public class FileSystemRepository implements 
ContentRepository {
                 // In order to accomplish this, we just peek at the head and 
check if it should be deleted.
                 // If so, then we call poll() to remove it
                 if (freed < toFree || getLastModTime(toDelete.toPath()) < 
removalTimeThreshold) {
-                    toDelete = fileQueue.poll();   // remove the head of the 
queue, which is already stored in 'toDelete'
+                    toDelete = fileQueue.poll(); // remove the head of the 
queue, which is already stored in 'toDelete'
                     Files.deleteIfExists(toDelete.toPath());
                     containerState.decrementArchiveCount();
                     LOG.debug("Deleted archived ContentClaim with ID {} from 
Container {} because the archival size was exceeding the max configured size", 
toDelete.getName(), containerName);
@@ -1229,9 +1234,12 @@ public class FileSystemRepository implements 
ContentRepository {
                 if (freed >= toFree) {
                     // If the last mod time indicates that it should be 
removed, just continue loop.
                     if (deleteBasedOnTimestamp(fileQueue, 
removalTimeThreshold)) {
+                        archiveExpirationLog.debug("Freed enough space ({} 
bytes freed, needed to free {} bytes) but will continue to expire data based on 
timestamp", freed, toFree);
                         continue;
                     }
 
+                    archiveExpirationLog.debug("Freed enough space ({} bytes 
freed, needed to free {} bytes). Finished expiring data", freed, toFree);
+
                     final ArchiveInfo archiveInfo = fileQueue.peek();
                     final long oldestArchiveDate = archiveInfo == null ? 
System.currentTimeMillis() : getLastModTime(archiveInfo.toPath());
 
@@ -1256,6 +1264,7 @@ public class FileSystemRepository implements 
ContentRepository {
         }
 
         // Go through each container and grab the archived data into a List
+        archiveExpirationLog.debug("Searching for more archived data to 
expire");
         final StopWatch stopWatch = new StopWatch(true);
         for (int i = 0; i < SECTIONS_PER_CONTAINER; i++) {
             final Path sectionContainer = container.resolve(String.valueOf(i));
@@ -1278,7 +1287,7 @@ public class FileSystemRepository implements 
ContentRepository {
                                 Files.deleteIfExists(file);
                                 containerState.decrementArchiveCount();
                                 LOG.debug("Deleted archived ContentClaim with 
ID {} from Container {} because it was older than the configured max archival 
duration",
-                                        file.toFile().getName(), 
containerName);
+                                    file.toFile().getName(), containerName);
                             } catch (final IOException ioe) {
                                 LOG.warn("Failed to remove archived 
ContentClaim with ID {} from Container {} due to {}", file.toFile().getName(), 
containerName, ioe.toString());
                                 if (LOG.isDebugEnabled()) {
@@ -1312,6 +1321,7 @@ public class FileSystemRepository implements 
ContentRepository {
         final long sortRemainingMillis = 
stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteExpiredMillis;
 
         // Delete the oldest data
+        archiveExpirationLog.debug("Deleting data based on timestamp");
         final Iterator<ArchiveInfo> itr = notYetExceedingThreshold.iterator();
         int counter = 0;
         while (itr.hasNext()) {
@@ -1325,7 +1335,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
                 // Check if we've freed enough space every 25 files that we 
destroy
                 if (++counter % 25 == 0) {
-                    if (getContainerUsableSpace(containerName) > 
minRequiredSpace) {  // check if we can stop now
+                    if (getContainerUsableSpace(containerName) > 
minRequiredSpace) { // check if we can stop now
                         LOG.debug("Finished cleaning up archive for Container 
{}", containerName);
                         break;
                     }
@@ -1360,7 +1370,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
         final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) 
- deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis;
         LOG.debug("Oldest Archive Date for Container {} is {}; delete expired 
= {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms",
-                containerName, new Date(oldestContainerArchive), 
deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
+            containerName, new Date(oldestContainerArchive), 
deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
         return oldestContainerArchive;
     }
 

Reply via email to